You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2018/05/28 09:21:28 UTC
[1/3] flink git commit: [hotfix][tests] Report failure with error
level instead of debug
Repository: flink
Updated Branches:
refs/heads/master 6d002941f -> a5fa09312
[hotfix][tests] Report failure with error level instead of debug
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62bf8fd4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62bf8fd4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62bf8fd4
Branch: refs/heads/master
Commit: 62bf8fd4813271c64afbb1509b31811c79246bce
Parents: 6d00294
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon May 14 13:46:08 2018 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon May 28 11:21:04 2018 +0200
----------------------------------------------------------------------
.../rest/handler/taskmanager/AbstractTaskManagerFileHandler.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/62bf8fd4/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index 5b5d97d..83fab69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -155,7 +155,7 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
resultFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
- log.debug("Failed to transfer file from TaskExecutor {}.", taskManagerId, throwable);
+ log.error("Failed to transfer file from TaskExecutor {}.", taskManagerId, throwable);
fileBlobKeys.invalidate(taskManagerId);
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
[2/3] flink git commit: [FLINK-9386] Embed netty router
Posted by uc...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
new file mode 100644
index 0000000..2e8200e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Inbound handler that converts HttpRequest to Routed and passes Routed to the matched handler.
+ *
+ * <p>This class replaces the standard error response to be identical with those sent by the {@link AbstractRestHandler}.
+ *
+ * <p>This class is based on:
+ * https://github.com/sinetja/netty-router/blob/1.10/src/main/java/io/netty/handler/codec/http/router/AbstractHandler.java
+ * https://github.com/sinetja/netty-router/blob/1.10/src/main/java/io/netty/handler/codec/http/router/Handler.java
+ */
+public class RouterHandler extends SimpleChannelInboundHandler<HttpRequest> {
+ private static final String ROUTER_HANDLER_NAME = RouterHandler.class.getName() + "_ROUTER_HANDLER";
+ private static final String ROUTED_HANDLER_NAME = RouterHandler.class.getName() + "_ROUTED_HANDLER";
+
+ private final Map<String, String> responseHeaders;
+ private final Router router;
+
+ public RouterHandler(Router router, final Map<String, String> responseHeaders) {
+ this.router = requireNonNull(router);
+ this.responseHeaders = requireNonNull(responseHeaders);
+ }
+
+ public String getName() {
+ return ROUTER_HANDLER_NAME;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) {
+ if (HttpHeaders.is100ContinueExpected(httpRequest)) {
+ channelHandlerContext.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
+ return;
+ }
+
+ // Route
+ HttpMethod method = httpRequest.getMethod();
+ QueryStringDecoder qsd = new QueryStringDecoder(httpRequest.getUri());
+ RouteResult<?> routeResult = router.route(method, qsd.path(), qsd.parameters());
+
+ if (routeResult == null) {
+ respondNotFound(channelHandlerContext, httpRequest);
+ return;
+ }
+
+ routed(channelHandlerContext, routeResult, httpRequest);
+ }
+
+ private void routed(
+ ChannelHandlerContext channelHandlerContext,
+ RouteResult<?> routeResult,
+ HttpRequest httpRequest) {
+ ChannelInboundHandler handler = (ChannelInboundHandler) routeResult.target();
+
+ // The handler may have been added (keep alive)
+ ChannelPipeline pipeline = channelHandlerContext.pipeline();
+ ChannelHandler addedHandler = pipeline.get(ROUTED_HANDLER_NAME);
+ if (handler != addedHandler) {
+ if (addedHandler == null) {
+ pipeline.addAfter(ROUTER_HANDLER_NAME, ROUTED_HANDLER_NAME, handler);
+ } else {
+ pipeline.replace(addedHandler, ROUTED_HANDLER_NAME, handler);
+ }
+ }
+
+ RoutedRequest<?> request = new RoutedRequest<>(routeResult, httpRequest);
+ channelHandlerContext.fireChannelRead(request.retain());
+ }
+
+ private void respondNotFound(ChannelHandlerContext channelHandlerContext, HttpRequest request) {
+ HandlerUtils.sendErrorResponse(
+ channelHandlerContext,
+ request,
+ new ErrorResponseBody("Not found."),
+ HttpResponseStatus.NOT_FOUND,
+ responseHeaders);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/KeepAliveWrite.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/KeepAliveWrite.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/KeepAliveWrite.java
new file mode 100644
index 0000000..906613c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/KeepAliveWrite.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.util;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+
+/**
+ * Utilities to write.
+ */
+public class KeepAliveWrite {
+ public static ChannelFuture flush(ChannelHandlerContext ctx, HttpRequest request, HttpResponse response) {
+ if (!HttpHeaders.isKeepAlive(request)) {
+ return ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+ } else {
+ response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+ return ctx.writeAndFlush(response);
+ }
+ }
+
+ public static ChannelFuture flush(Channel ch, HttpRequest req, HttpResponse res) {
+ if (!HttpHeaders.isKeepAlive(req)) {
+ return ch.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE);
+ } else {
+ res.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+ return ch.writeAndFlush(res);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
index b600cbe..2b0291f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.router.RouteResult;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
@@ -36,7 +38,6 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
import org.junit.Assert;
import org.junit.Test;
@@ -134,11 +135,16 @@ public class TaskManagerLogHandlerTest {
Map<String, String> pathParams = new HashMap<>();
pathParams.put(TaskManagersHandler.TASK_MANAGER_ID_KEY, tmID.toString());
- Routed routed = mock(Routed.class);
- when(routed.pathParams()).thenReturn(pathParams);
- when(routed.request()).thenReturn(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/taskmanagers/" + tmID + "/log"));
-
- handler.respondAsLeader(ctx, routed, jobManagerGateway);
+ RoutedRequest routedRequest = new RoutedRequest(
+ new RouteResult(
+ "shouldn't be used",
+ "shouldn't be used either",
+ pathParams,
+ new HashMap<>(),
+ new Object()),
+ new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/taskmanagers/" + tmID + "/log"));
+
+ handler.respondAsLeader(ctx, routedRequest, jobManagerGateway);
Assert.assertEquals("Fetching TaskManager log failed.", exception.get());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/router/RouterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/router/RouterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/router/RouterTest.java
new file mode 100644
index 0000000..b67bc77
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/router/RouterTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Set;
+
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod.GET;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod.POST;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link Router}.
+ */
+public class RouterTest {
+ private Router<String> router;
+
+ @Before
+ public void setUp() {
+ router = StringRouter.create();
+ }
+
+ @Test
+ public void testIgnoreSlashesAtBothEnds() {
+ assertEquals("index", router.route(GET, "articles").target());
+ assertEquals("index", router.route(GET, "/articles").target());
+ assertEquals("index", router.route(GET, "//articles").target());
+ assertEquals("index", router.route(GET, "articles/").target());
+ assertEquals("index", router.route(GET, "articles//").target());
+ assertEquals("index", router.route(GET, "/articles/").target());
+ assertEquals("index", router.route(GET, "//articles//").target());
+ }
+
+ @Test
+ public void testEmptyParams() {
+ RouteResult<String> routed = router.route(GET, "/articles");
+ assertEquals("index", routed.target());
+ assertEquals(0, routed.pathParams().size());
+ }
+
+ @Test
+ public void testParams() {
+ RouteResult<String> routed = router.route(GET, "/articles/123");
+ assertEquals("show", routed.target());
+ assertEquals(1, routed.pathParams().size());
+ assertEquals("123", routed.pathParams().get("id"));
+ }
+
+ @Test
+ public void testNone() {
+ RouteResult<String> routed = router.route(GET, "/noexist");
+ assertEquals("404", routed.target());
+ }
+
+ @Test
+ public void testSplatWildcard() {
+ RouteResult<String> routed = router.route(GET, "/download/foo/bar.png");
+ assertEquals("download", routed.target());
+ assertEquals(1, routed.pathParams().size());
+ assertEquals("foo/bar.png", routed.pathParams().get("*"));
+ }
+
+ @Test
+ public void testOrder() {
+ RouteResult<String> routed1 = router.route(GET, "/articles/new");
+ assertEquals("new", routed1.target());
+ assertEquals(0, routed1.pathParams().size());
+
+ RouteResult<String> routed2 = router.route(GET, "/articles/123");
+ assertEquals("show", routed2.target());
+ assertEquals(1, routed2.pathParams().size());
+ assertEquals("123", routed2.pathParams().get("id"));
+
+ RouteResult<String> routed3 = router.route(GET, "/notfound");
+ assertEquals("404", routed3.target());
+ assertEquals(0, routed3.pathParams().size());
+
+ RouteResult<String> routed4 = router.route(GET, "/articles/overview");
+ assertEquals("overview", routed4.target());
+ assertEquals(0, routed4.pathParams().size());
+
+ RouteResult<String> routed5 = router.route(GET, "/articles/overview/detailed");
+ assertEquals("detailed", routed5.target());
+ assertEquals(0, routed5.pathParams().size());
+ }
+
+ @Test
+ public void testAnyMethod() {
+ RouteResult<String> routed1 = router.route(GET, "/anyMethod");
+ assertEquals("anyMethod", routed1.target());
+ assertEquals(0, routed1.pathParams().size());
+
+ RouteResult<String> routed2 = router.route(POST, "/anyMethod");
+ assertEquals("anyMethod", routed2.target());
+ assertEquals(0, routed2.pathParams().size());
+ }
+
+ @Test
+ public void testRemoveByPathPattern() {
+ router.removePathPattern("/articles");
+ RouteResult<String> routed = router.route(GET, "/articles");
+ assertEquals("404", routed.target());
+ }
+
+ @Test
+ public void testAllowedMethods() {
+ assertEquals(9, router.allAllowedMethods().size());
+
+ Set<HttpMethod> methods = router.allowedMethods("/articles");
+ assertEquals(2, methods.size());
+ assertTrue(methods.contains(GET));
+ assertTrue(methods.contains(POST));
+ }
+
+ @Test
+ public void testSubclasses() {
+ Router<Class<? extends Action>> router = new Router<Class<? extends Action>>()
+ .addRoute(GET, "/articles", Index.class)
+ .addRoute(GET, "/articles/:id", Show.class);
+
+ RouteResult<Class<? extends Action>> routed1 = router.route(GET, "/articles");
+ RouteResult<Class<? extends Action>> routed2 = router.route(GET, "/articles/123");
+ assertNotNull(routed1);
+ assertNotNull(routed2);
+ assertEquals(Index.class, routed1.target());
+ assertEquals(Show.class, routed2.target());
+ }
+
+ private static final class StringRouter {
+ // Utility classes should not have a public or default constructor.
+ private StringRouter() { }
+
+ static Router<String> create() {
+ return new Router<String>()
+ .addGet("/articles", "index")
+ .addGet("/articles/new", "new")
+ .addGet("/articles/overview", "overview")
+ .addGet("/articles/overview/detailed", "detailed")
+ .addGet("/articles/:id", "show")
+ .addGet("/articles/:id/:format", "show")
+ .addPost("/articles", "post")
+ .addPatch("/articles/:id", "patch")
+ .addDelete("/articles/:id", "delete")
+ .addAny("/anyMethod", "anyMethod")
+ .addGet("/download/:*", "download")
+ .notFound("404");
+ }
+ }
+
+ private interface Action {
+ }
+
+ private class Index implements Action {
+ }
+
+ private class Show implements Action {
+ }
+}
[3/3] flink git commit: [FLINK-9386] Embed netty router
Posted by uc...@apache.org.
[FLINK-9386] Embed netty router
This commit replaces netty-router dependency with our own version of it, which is
simplified and adds guarantees about order of matching router patterns.
This is a prerequisite for FLINK-3952. netty-router 1.10 is incompatible with
Netty 4.1, while netty-router 2.2.0 brakes a compatibility in a way that we
were unable to use it.
This closes #6031.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a5fa0931
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a5fa0931
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a5fa0931
Branch: refs/heads/master
Commit: a5fa0931293e84c9e5fc20283b611c6f875ca500
Parents: 62bf8fd
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed May 16 21:26:36 2018 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon May 28 11:21:05 2018 +0200
----------------------------------------------------------------------
.../flink/mesos/util/MesosArtifactServer.java | 20 +-
.../webmonitor/RuntimeMonitorHandler.java | 26 +-
.../runtime/webmonitor/WebRuntimeMonitor.java | 14 +-
.../webmonitor/history/HistoryServer.java | 5 +-
.../HistoryServerStaticFileServerHandler.java | 10 +-
.../webmonitor/utils/WebFrontendBootstrap.java | 9 +-
.../runtime/webmonitor/RedirectHandlerTest.java | 12 +-
...istoryServerStaticFileServerHandlerTest.java | 5 +-
.../flink/runtime/rest/AbstractHandler.java | 15 +-
.../flink/runtime/rest/RestServerEndpoint.java | 17 +-
.../runtime/rest/handler/RedirectHandler.java | 43 +-
.../runtime/rest/handler/RouterHandler.java | 56 ---
.../handler/legacy/ConstantTextHandler.java | 10 +-
.../handler/legacy/TaskManagerLogHandler.java | 8 +-
.../legacy/files/StaticFileServerHandler.java | 14 +-
.../rest/handler/router/MethodlessRouter.java | 125 ++++++
.../rest/handler/router/PathPattern.java | 181 +++++++++
.../rest/handler/router/RouteResult.java | 138 +++++++
.../rest/handler/router/RoutedRequest.java | 97 +++++
.../runtime/rest/handler/router/Router.java | 399 +++++++++++++++++++
.../rest/handler/router/RouterHandler.java | 116 ++++++
.../rest/handler/util/KeepAliveWrite.java | 50 +++
.../legacy/TaskManagerLogHandlerTest.java | 18 +-
.../runtime/rest/handler/router/RouterTest.java | 180 +++++++++
24 files changed, 1405 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index 57f4718..d0d41e2 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -25,6 +25,9 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.handler.router.Router;
+import org.apache.flink.runtime.rest.handler.router.RouterHandler;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -47,9 +50,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedStream;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
@@ -135,7 +135,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
@Override
protected void initChannel(SocketChannel ch) {
- Handler handler = new Handler(router);
+ RouterHandler handler = new RouterHandler(router, new HashMap<>());
// SSL should be the first handler in the pipeline
if (serverSSLContext != null) {
@@ -148,7 +148,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new ChunkedWriteHandler())
- .addLast(handler.name(), handler)
+ .addLast(handler.getName(), handler)
.addLast(new UnknownFileHandler());
}
};
@@ -221,7 +221,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
throw new IllegalArgumentException("not expecting an absolute path");
}
URL fileURL = new URL(baseURL, remoteFile.toString());
- router.ANY(fileURL.getPath(), new VirtualFileServerHandler(path));
+ router.addAny(fileURL.getPath(), new VirtualFileServerHandler(path));
paths.put(remoteFile, fileURL);
@@ -236,7 +236,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
- router.removePath(fileURL.getPath());
+ router.removePathPattern(fileURL.getPath());
}
}
@@ -267,7 +267,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
* Handle HEAD and GET requests for a specific file.
*/
@ChannelHandler.Sharable
- public static class VirtualFileServerHandler extends SimpleChannelInboundHandler<Routed> {
+ public static class VirtualFileServerHandler extends SimpleChannelInboundHandler<RoutedRequest> {
private FileSystem fs;
private Path path;
@@ -284,9 +284,9 @@ public class MesosArtifactServer implements MesosArtifactResolver {
}
@Override
- protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
+ protected void channelRead0(ChannelHandlerContext ctx, RoutedRequest routedRequest) throws Exception {
- HttpRequest request = routed.request();
+ HttpRequest request = routedRequest.getRequest();
if (LOG.isDebugEnabled()) {
LOG.debug("{} request for file '{}'", request.getMethod(), path);
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 7109171..c80062f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -25,18 +25,19 @@ import org.apache.flink.runtime.rest.NotFoundException;
import org.apache.flink.runtime.rest.handler.RedirectHandler;
import org.apache.flink.runtime.rest.handler.WebHandler;
import org.apache.flink.runtime.rest.handler.legacy.RequestHandler;
+import org.apache.flink.runtime.rest.handler.router.RouteResult;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
+import org.apache.flink.runtime.rest.handler.util.KeepAliveWrite;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,7 +80,7 @@ public class RuntimeMonitorHandler extends RedirectHandler<JobManagerGateway> im
localJobManagerAddressFuture,
retriever,
timeout,
- Collections.singletonMap(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, cfg.getAllowOrigin()));
+ Collections.singletonMap(Names.ACCESS_CONTROL_ALLOW_ORIGIN, cfg.getAllowOrigin()));
this.handler = checkNotNull(handler);
this.allowOrigin = cfg.getAllowOrigin();
}
@@ -89,19 +90,20 @@ public class RuntimeMonitorHandler extends RedirectHandler<JobManagerGateway> im
}
@Override
- protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway) {
+ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRequest, JobManagerGateway jobManagerGateway) {
CompletableFuture<FullHttpResponse> responseFuture;
+ RouteResult<?> result = routedRequest.getRouteResult();
try {
// we only pass the first element in the list to the handlers.
Map<String, String> queryParams = new HashMap<>();
- for (String key : routed.queryParams().keySet()) {
- queryParams.put(key, routed.queryParam(key));
+ for (String key : result.queryParams().keySet()) {
+ queryParams.put(key, result.queryParam(key));
}
- Map<String, String> pathParams = new HashMap<>(routed.pathParams().size());
- for (String key : routed.pathParams().keySet()) {
- pathParams.put(key, URLDecoder.decode(routed.pathParams().get(key), ENCODING.toString()));
+ Map<String, String> pathParams = new HashMap<>(result.pathParams().size());
+ for (String key : result.pathParams().keySet()) {
+ pathParams.put(key, URLDecoder.decode(result.pathParams().get(key), ENCODING.toString()));
}
queryParams.put(WEB_MONITOR_ADDRESS_KEY, localAddressFuture.get());
@@ -131,8 +133,8 @@ public class RuntimeMonitorHandler extends RedirectHandler<JobManagerGateway> im
finalResponse = httpResponse;
}
- finalResponse.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, allowOrigin);
- KeepAliveWrite.flush(ctx, routed.request(), finalResponse);
+ finalResponse.headers().set(Names.ACCESS_CONTROL_ALLOW_ORIGIN, allowOrigin);
+ KeepAliveWrite.flush(ctx, routedRequest.getRequest(), finalResponse);
});
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index f27ae00..976b080 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -72,6 +72,7 @@ import org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHand
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler;
import org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler;
+import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.webmonitor.handlers.legacy.JarAccessDeniedHandler;
import org.apache.flink.runtime.webmonitor.handlers.legacy.JarDeleteHandler;
import org.apache.flink.runtime.webmonitor.handlers.legacy.JarListHandler;
@@ -87,7 +88,6 @@ import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -320,14 +320,14 @@ public class WebRuntimeMonitor implements WebMonitor {
router
// log and stdout
- .GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
+ .addGet("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
new StaticFileServerHandler<>(
retriever,
localRestAddress,
timeout,
logFiles.logFile))
- .GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
+ .addGet("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
new StaticFileServerHandler<>(retriever, localRestAddress, timeout, logFiles.stdOutFile));
// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
@@ -376,7 +376,7 @@ public class WebRuntimeMonitor implements WebMonitor {
}
// this handler serves all the static contents
- router.GET("/:*", new StaticFileServerHandler<>(
+ router.addGet("/:*", new StaticFileServerHandler<>(
retriever,
localRestAddress,
timeout,
@@ -515,7 +515,7 @@ public class WebRuntimeMonitor implements WebMonitor {
private static <T extends ChannelInboundHandler & WebHandler> void get(Router router, T handler) {
for (String path : handler.getPaths()) {
- router.GET(path, handler);
+ router.addGet(path, handler);
}
}
@@ -525,7 +525,7 @@ public class WebRuntimeMonitor implements WebMonitor {
private static <T extends ChannelInboundHandler & WebHandler> void delete(Router router, T handler) {
for (String path : handler.getPaths()) {
- router.DELETE(path, handler);
+ router.addDelete(path, handler);
}
}
@@ -535,7 +535,7 @@ public class WebRuntimeMonitor implements WebMonitor {
private static <T extends ChannelInboundHandler & WebHandler> void post(Router router, T handler) {
for (String path : handler.getPaths()) {
- router.POST(path, handler);
+ router.addPost(path, handler);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index d361934..aacc935 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.history.FsJobArchivist;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
@@ -38,8 +39,6 @@ import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -216,7 +215,7 @@ public class HistoryServer {
LOG.info("Using directory {} as local cache.", webDir);
Router router = new Router();
- router.GET("/:*", new HistoryServerStaticFileServerHandler(webDir));
+ router.addGet("/:*", new HistoryServerStaticFileServerHandler(webDir));
if (!webDir.exists() && !webDir.mkdirs()) {
throw new IOException("Failed to create local directory " + webDir.getAbsoluteFile() + ".");
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
index 12a27a7..b0c2102 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
@@ -27,6 +27,7 @@ package org.apache.flink.runtime.webmonitor.history;
*****************************************************************************/
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
@@ -40,7 +41,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
@@ -81,7 +81,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* page is prevented.
*/
@ChannelHandler.Sharable
-public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHandler<Routed> {
+public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHandler<RoutedRequest> {
/** Default logger, if none is specified. */
private static final Logger LOG = LoggerFactory.getLogger(HistoryServerStaticFileServerHandler.class);
@@ -100,10 +100,10 @@ public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHa
// ------------------------------------------------------------------------
@Override
- public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
- String requestPath = routed.path();
+ public void channelRead0(ChannelHandlerContext ctx, RoutedRequest routedRequest) throws Exception {
+ String requestPath = routedRequest.getPath();
- respondWithFile(ctx, routed.request(), requestPath);
+ respondWithFile(ctx, routedRequest.getRequest(), requestPath);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
index 740beae..bec9ea2 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.webmonitor.utils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.runtime.rest.handler.router.Router;
+import org.apache.flink.runtime.rest.handler.router.RouterHandler;
import org.apache.flink.runtime.webmonitor.HttpRequestHandler;
import org.apache.flink.runtime.webmonitor.PipelineErrorHandler;
import org.apache.flink.util.Preconditions;
@@ -33,8 +35,6 @@ import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
@@ -47,6 +47,7 @@ import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+import java.util.HashMap;
/**
* This classes encapsulates the boot-strapping of netty for the web-frontend.
@@ -77,7 +78,7 @@ public class WebFrontendBootstrap {
@Override
protected void initChannel(SocketChannel ch) {
- Handler handler = new Handler(WebFrontendBootstrap.this.router);
+ RouterHandler handler = new RouterHandler(WebFrontendBootstrap.this.router, new HashMap<>());
// SSL should be the first handler in the pipeline
if (serverSSLContext != null) {
@@ -91,7 +92,7 @@ public class WebFrontendBootstrap {
.addLast(new HttpServerCodec())
.addLast(new ChunkedWriteHandler())
.addLast(new HttpRequestHandler(uploadDir))
- .addLast(handler.name(), handler)
+ .addLast(handler.getName(), handler)
.addLast(new PipelineErrorHandler(WebFrontendBootstrap.this.log));
}
};
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
index 98dc20a..6810da4 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
@@ -22,7 +22,10 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rest.handler.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
+import org.apache.flink.runtime.rest.handler.util.KeepAliveWrite;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
@@ -31,9 +34,6 @@ import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
import org.junit.Assert;
import org.junit.Test;
@@ -90,7 +90,7 @@ public class RedirectHandlerTest extends TestLogger {
gatewayRetriever,
timeout);
- router.GET(restPath, testingHandler);
+ router.addGet(restPath, testingHandler);
WebFrontendBootstrap bootstrap = new WebFrontendBootstrap(
router,
log,
@@ -148,10 +148,10 @@ public class RedirectHandlerTest extends TestLogger {
}
@Override
- protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, RestfulGateway gateway) throws Exception {
+ protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, RoutedRequest routedRequest, RestfulGateway gateway) throws Exception {
Assert.assertTrue(channelHandlerContext.channel().eventLoop().inEventLoop());
HttpResponse response = HandlerRedirectUtils.getResponse(HttpResponseStatus.OK, RESPONSE_MESSAGE);
- KeepAliveWrite.flush(channelHandlerContext.channel(), routed.request(), response);
+ KeepAliveWrite.flush(channelHandlerContext.channel(), routedRequest.getRequest(), response);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
index 23f0f53..b08504d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
@@ -19,10 +19,9 @@
package org.apache.flink.runtime.webmonitor.history;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
-
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -43,7 +42,7 @@ public class HistoryServerStaticFileServerHandlerTest {
public void testRespondWithFile() throws Exception {
File webDir = tmp.newFolder("webDir");
Router router = new Router()
- .GET("/:*", new HistoryServerStaticFileServerHandler(webDir));
+ .addGet("/:*", new HistoryServerStaticFileServerHandler(webDir));
WebFrontendBootstrap webUI = new WebFrontendBootstrap(
router,
LoggerFactory.getLogger(HistoryServerStaticFileServerHandlerTest.class),
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
index cb50a4f..e785def 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
import org.apache.flink.runtime.rest.handler.RedirectHandler;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.FileUpload;
@@ -43,7 +44,6 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,13 +83,12 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
}
@Override
- protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, T gateway) throws Exception {
+ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRequest, T gateway) throws Exception {
+ HttpRequest httpRequest = routedRequest.getRequest();
if (log.isTraceEnabled()) {
- log.trace("Received request " + routed.request().getUri() + '.');
+ log.trace("Received request " + httpRequest.getUri() + '.');
}
- final HttpRequest httpRequest = routed.request();
-
try {
if (!(httpRequest instanceof FullHttpRequest)) {
// The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns
@@ -152,7 +151,11 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
final HandlerRequest<R, M> handlerRequest;
try {
- handlerRequest = new HandlerRequest<>(request, untypedResponseMessageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), routed.queryParams());
+ handlerRequest = new HandlerRequest<>(
+ request,
+ untypedResponseMessageHeaders.getUnresolvedMessageParameters(),
+ routedRequest.getRouteResult().pathParams(),
+ routedRequest.getRouteResult().queryParams());
} catch (HandlerRequestException hre) {
log.error("Could not create the handler request.", hre);
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 01d1043..fa98dc5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -25,7 +25,8 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.net.SSLEngineFactory;
import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
-import org.apache.flink.runtime.rest.handler.RouterHandler;
+import org.apache.flink.runtime.rest.handler.router.Router;
+import org.apache.flink.runtime.rest.handler.router.RouterHandler;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
@@ -39,8 +40,6 @@ import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
@@ -154,7 +153,7 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
@Override
protected void initChannel(SocketChannel ch) {
- Handler handler = new RouterHandler(router, responseHeaders);
+ RouterHandler handler = new RouterHandler(router, responseHeaders);
// SSL should be the first handler in the pipeline
if (sslEngineFactory != null) {
@@ -166,7 +165,7 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
.addLast(new FileUploadHandler(uploadDir))
.addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders))
.addLast(new ChunkedWriteHandler())
- .addLast(handler.name(), handler)
+ .addLast(handler.getName(), handler)
.addLast(new PipelineErrorHandler(log, responseHeaders));
}
};
@@ -381,16 +380,16 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler) {
switch (specificationHandler.f0.getHttpMethod()) {
case GET:
- router.GET(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+ router.addGet(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
break;
case POST:
- router.POST(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+ router.addPost(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
break;
case DELETE:
- router.DELETE(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+ router.addDelete(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
break;
case PATCH:
- router.PATCH(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+ router.addPatch(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
break;
default:
throw new RuntimeException("Unsupported http method: " + specificationHandler.f0.getHttpMethod() + '.');
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
index 6a3fb7d..1b01b39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
@@ -19,8 +19,10 @@
package org.apache.flink.runtime.rest.handler;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.handler.util.KeepAliveWrite;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
@@ -30,10 +32,9 @@ import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
@@ -53,7 +54,7 @@ import java.util.concurrent.TimeUnit;
* @param <T> type of the leader to retrieve
*/
@ChannelHandler.Sharable
-public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
+public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<RoutedRequest> {
protected final Logger logger = LoggerFactory.getLogger(getClass());
@@ -82,7 +83,9 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
@Override
protected void channelRead0(
ChannelHandlerContext channelHandlerContext,
- Routed routed) throws Exception {
+ RoutedRequest routedRequest) throws Exception {
+
+ HttpRequest request = routedRequest.getRequest();
if (localAddressFuture.isDone()) {
if (localAddress == null) {
@@ -93,7 +96,7 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
HandlerUtils.sendErrorResponse(
channelHandlerContext,
- routed.request(),
+ request,
new ErrorResponseBody("Fatal error. Could not obtain local address. Please try to refresh."),
HttpResponseStatus.INTERNAL_SERVER_ERROR,
responseHeaders);
@@ -113,7 +116,7 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
timeout);
// retain the message for the asynchronous handler
- ReferenceCountUtil.retain(routed);
+ ReferenceCountUtil.retain(routedRequest);
optRedirectAddressFuture.whenCompleteAsync(
(Optional<String> optRedirectAddress, Throwable throwable) -> {
@@ -124,32 +127,32 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
HandlerUtils.sendErrorResponse(
channelHandlerContext,
- routed.request(),
+ request,
new ErrorResponseBody("Could not retrieve the redirect address of the current leader. Please try to refresh."),
HttpResponseStatus.INTERNAL_SERVER_ERROR,
responseHeaders);
} else if (optRedirectAddress.isPresent()) {
response = HandlerRedirectUtils.getRedirectResponse(
optRedirectAddress.get(),
- routed.path());
+ routedRequest.getPath());
- KeepAliveWrite.flush(channelHandlerContext, routed.request(), response);
+ KeepAliveWrite.flush(channelHandlerContext, request, response);
} else {
try {
- respondAsLeader(channelHandlerContext, routed, gateway);
+ respondAsLeader(channelHandlerContext, routedRequest, gateway);
} catch (Exception e) {
logger.error("Error while responding as leader.", e);
- HandlerUtils.sendErrorResponse(
+ HandlerUtils.sendErrorResponse(
channelHandlerContext,
- routed.request(),
- new ErrorResponseBody("Error while responding to the request."),
- HttpResponseStatus.INTERNAL_SERVER_ERROR,
- responseHeaders);
+ request,
+ new ErrorResponseBody("Error while responding to the request."),
+ HttpResponseStatus.INTERNAL_SERVER_ERROR,
+ responseHeaders);
}
}
} finally {
// release the message after processing it asynchronously
- ReferenceCountUtil.release(routed);
+ ReferenceCountUtil.release(routedRequest);
}
}
, channelHandlerContext.executor());
@@ -158,7 +161,7 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
() ->
HandlerUtils.sendErrorResponse(
channelHandlerContext,
- routed.request(),
+ request,
new ErrorResponseBody("Service temporarily unavailable due to an ongoing leader election. Please refresh."),
HttpResponseStatus.SERVICE_UNAVAILABLE,
responseHeaders));
@@ -168,7 +171,7 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
HandlerUtils.sendErrorResponse(
channelHandlerContext,
- routed.request(),
+ request,
new ErrorResponseBody("Error occurred in RedirectHandler: " + throwable.getMessage() + '.'),
HttpResponseStatus.INTERNAL_SERVER_ERROR,
responseHeaders);
@@ -176,12 +179,12 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
} else {
HandlerUtils.sendErrorResponse(
channelHandlerContext,
- routed.request(),
+ request,
new ErrorResponseBody("Local address has not been resolved. This indicates an internal error."),
HttpResponseStatus.INTERNAL_SERVER_ERROR,
responseHeaders);
}
}
- protected abstract void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, T gateway) throws Exception;
+ protected abstract void respondAsLeader(ChannelHandlerContext channelHandlerContext, RoutedRequest request, T gateway) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
deleted file mode 100644
index fc02250..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler;
-
-import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
-import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
-
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
-
-import java.util.Map;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * This class is an extension of {@link Handler} that replaces the standard error response to be identical with those
- * sent by the {@link AbstractRestHandler}.
- */
-public class RouterHandler extends Handler {
-
- private final Map<String, String> responseHeaders;
-
- public RouterHandler(Router router, final Map<String, String> responseHeaders) {
- super(router);
- this.responseHeaders = requireNonNull(responseHeaders);
- }
-
- @Override
- protected void respondNotFound(ChannelHandlerContext ctx, HttpRequest request) {
- HandlerUtils.sendErrorResponse(
- ctx,
- request,
- new ErrorResponseBody("Not found."),
- HttpResponseStatus.NOT_FOUND,
- responseHeaders);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java
index 57214f0..490a3c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.handler.util.KeepAliveWrite;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
@@ -29,14 +31,12 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
/**
* Responder that returns a constant String.
*/
@ChannelHandler.Sharable
-public class ConstantTextHandler extends SimpleChannelInboundHandler<Routed> {
+public class ConstantTextHandler extends SimpleChannelInboundHandler<RoutedRequest> {
private final byte[] encodedText;
@@ -45,13 +45,13 @@ public class ConstantTextHandler extends SimpleChannelInboundHandler<Routed> {
}
@Override
- protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
+ protected void channelRead0(ChannelHandlerContext ctx, RoutedRequest routed) throws Exception {
HttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(encodedText));
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, encodedText.length);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
- KeepAliveWrite.flush(ctx, routed.request(), response);
+ KeepAliveWrite.flush(ctx, routed.getRequest(), response);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
index ed06d3d..3286b22 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.RedirectHandler;
import org.apache.flink.runtime.rest.handler.WebHandler;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.FlinkException;
@@ -52,7 +53,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
@@ -148,7 +148,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
* Response when running with leading JobManager.
*/
@Override
- protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed routed, final JobManagerGateway jobManagerGateway) {
+ protected void respondAsLeader(final ChannelHandlerContext ctx, final RoutedRequest routedRequest, final JobManagerGateway jobManagerGateway) {
if (cache == null) {
CompletableFuture<Integer> blobPortFuture = jobManagerGateway.requestBlobServerPort(timeout);
cache = blobPortFuture.thenApplyAsync(
@@ -162,8 +162,8 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
executor);
}
- final String taskManagerId = routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
- final HttpRequest request = routed.request();
+ final String taskManagerId = routedRequest.getRouteResult().param(TaskManagersHandler.TASK_MANAGER_ID_KEY);
+ final HttpRequest request = routedRequest.getRequest();
//fetch TaskManager logs if no other process is currently doing it
if (lastRequestPending.putIfAbsent(taskManagerId, true) == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
index 5159e12..62b94e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
@@ -28,6 +28,7 @@ package org.apache.flink.runtime.rest.handler.legacy.files;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
import org.apache.flink.runtime.rest.handler.util.MimeTypes;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
@@ -47,7 +48,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil;
@@ -125,19 +125,19 @@ public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectH
// ------------------------------------------------------------------------
@Override
- protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, T gateway) throws Exception {
- final HttpRequest request = routed.request();
+ protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, RoutedRequest routedRequest, T gateway) throws Exception {
+ final HttpRequest request = routedRequest.getRequest();
final String requestPath;
// make sure we request the "index.html" in case there is a directory request
- if (routed.path().endsWith("/")) {
- requestPath = routed.path() + "index.html";
+ if (routedRequest.getPath().endsWith("/")) {
+ requestPath = routedRequest.getPath() + "index.html";
}
// in case the files being accessed are logs or stdout files, find appropriate paths.
- else if (routed.path().equals("/jobmanager/log") || routed.path().equals("/jobmanager/stdout")) {
+ else if (routedRequest.getPath().equals("/jobmanager/log") || routedRequest.getPath().equals("/jobmanager/stdout")) {
requestPath = "";
} else {
- requestPath = routed.path();
+ requestPath = routedRequest.getPath();
}
respondToRequest(channelHandlerContext, request, requestPath);
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/MethodlessRouter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/MethodlessRouter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/MethodlessRouter.java
new file mode 100644
index 0000000..e37116b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/MethodlessRouter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This is adopted and simplified code from tv.cntt:netty-router library. For more information check {@link Router}.
+ *
+ * <p>Router that doesn't contain information about HTTP request methods and route
+ * matching orders.
+ *
+ * <p>This class is based on:
+ * https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/MethodlessRouter.java
+ * https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/OrderlessRouter.java
+ */
+final class MethodlessRouter<T> {
+ private static final Logger log = LoggerFactory.getLogger(MethodlessRouter.class);
+
+ // A path pattern can only point to one target
+ private final Map<PathPattern, T> routes = new LinkedHashMap<>();
+
+ //--------------------------------------------------------------------------
+
+ /**
+ * Returns all routes in this router, an unmodifiable map of {@code PathPattern -> Target}.
+ */
+ public Map<PathPattern, T> routes() {
+ return Collections.unmodifiableMap(routes);
+ }
+
+ /**
+ * This method does nothing if the path pattern has already been added.
+ * A path pattern can only point to one target.
+ */
+ public MethodlessRouter<T> addRoute(String pathPattern, T target) {
+ PathPattern p = new PathPattern(pathPattern);
+ if (routes.containsKey(p)) {
+ return this;
+ }
+
+ routes.put(p, target);
+ return this;
+ }
+
+ //--------------------------------------------------------------------------
+
+ /**
+ * Removes the route specified by the path pattern.
+ */
+ public void removePathPattern(String pathPattern) {
+ PathPattern p = new PathPattern(pathPattern);
+ T target = routes.remove(p);
+ if (target == null) {
+ return;
+ }
+ }
+
+ //--------------------------------------------------------------------------
+
+ /**
+ * @return {@code null} if no match
+ */
+ public RouteResult<T> route(String uri, String decodedPath, Map<String, List<String>> queryParameters, String[] pathTokens) {
+ // Optimize: reuse requestPathTokens and pathParams in the loop
+ Map<String, String> pathParams = new HashMap<>();
+ for (Entry<PathPattern, T> entry : routes.entrySet()) {
+ PathPattern pattern = entry.getKey();
+ if (pattern.match(pathTokens, pathParams)) {
+ T target = entry.getValue();
+ return new RouteResult<T>(uri, decodedPath, pathParams, queryParameters, target);
+ }
+
+ // Reset for the next try
+ pathParams.clear();
+ }
+
+ return null;
+ }
+
+ /**
+ * Checks if there's any matching route.
+ */
+ public boolean anyMatched(String[] requestPathTokens) {
+ Map<String, String> pathParams = new HashMap<>();
+ for (PathPattern pattern : routes.keySet()) {
+ if (pattern.match(requestPathTokens, pathParams)) {
+ return true;
+ }
+
+ // Reset for the next loop
+ pathParams.clear();
+ }
+
+ return false;
+ }
+
+ public int size() {
+ return routes.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/PathPattern.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/PathPattern.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/PathPattern.java
new file mode 100644
index 0000000..ff32e4f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/PathPattern.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is adopted and simplified code from tv.cntt:netty-router library. For more information check {@link Router}.
+ * Original code:
+ * https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/PathPattern.java
+ *
+ * <p>The pattern can contain constants or placeholders, example:
+ * {@code constant1/:placeholder1/constant2/:*}.
+ *
+ * <p>{@code :*} is a special placeholder to catch the rest of the path
+ * (may include slashes). If exists, it must appear at the end of the path.
+ *
+ * <p>The pattern must not contain query, example:
+ * {@code constant1/constant2?foo=bar}.
+ *
+ * <p>The pattern will be broken to tokens, example:
+ * {@code ["constant1", ":variable", "constant2", ":*"]}
+ */
+final class PathPattern {
+ private final String pattern;
+
+ //--------------------------------------------------------------------------
+ private final String[] tokens;
+ /**
+ * The pattern must not contain query, example:
+ * {@code constant1/constant2?foo=bar}.
+ *
+ * <p>The pattern will be stored without slashes at both ends.
+ */
+ public PathPattern(String pattern) {
+ if (pattern.contains("?")) {
+ throw new IllegalArgumentException("Path pattern must not contain query");
+ }
+
+ this.pattern = removeSlashesAtBothEnds(checkNotNull(pattern, "pattern"));
+ this.tokens = this.pattern.split("/");
+ }
+
+ public static String removeSlashesAtBothEnds(String path) {
+ checkNotNull(path, "path");
+
+ if (path.isEmpty()) {
+ return path;
+ }
+
+ int beginIndex = 0;
+ while (beginIndex < path.length() && path.charAt(beginIndex) == '/') {
+ beginIndex++;
+ }
+ if (beginIndex == path.length()) {
+ return "";
+ }
+
+ int endIndex = path.length() - 1;
+ while (endIndex > beginIndex && path.charAt(endIndex) == '/') {
+ endIndex--;
+ }
+
+ return path.substring(beginIndex, endIndex + 1);
+ }
+
+ /**
+ * Returns the pattern given at the constructor, without slashes at both ends.
+ */
+ public String pattern() {
+ return pattern;
+ }
+
+ /**
+ * Returns the pattern given at the constructor, without slashes at both ends,
+ * and split by {@code '/'}.
+ */
+ public String[] tokens() {
+ return tokens;
+ }
+
+ //--------------------------------------------------------------------------
+ // Instances of this class can be conveniently used as Map keys.
+
+ @Override
+ public int hashCode() {
+ return pattern.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof PathPattern)) {
+ return false;
+ }
+
+ PathPattern otherPathPattern = (PathPattern) o;
+ return pattern.equals(otherPathPattern.pattern);
+ }
+
+ //--------------------------------------------------------------------------
+
+ /**
+ * {@code params} will be updated with params embedded in the request path.
+ *
+ * <p>This method signature is designed so that {@code requestPathTokens} and {@code params}
+ * can be created only once then reused, to optimize for performance when a
+ * large number of path patterns need to be matched.
+ *
+ * @return {@code false} if not matched; in this case params should be reset
+ */
+ public boolean match(String[] requestPathTokens, Map<String, String> params) {
+ if (tokens.length == requestPathTokens.length) {
+ for (int i = 0; i < tokens.length; i++) {
+ String key = tokens[i];
+ String value = requestPathTokens[i];
+
+ if (key.length() > 0 && key.charAt(0) == ':') {
+ // This is a placeholder
+ params.put(key.substring(1), value);
+ } else if (!key.equals(value)) {
+ // This is a constant
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ if (tokens.length > 0 &&
+ tokens[tokens.length - 1].equals(":*") &&
+ tokens.length <= requestPathTokens.length) {
+ // The first part
+ for (int i = 0; i < tokens.length - 2; i++) {
+ String key = tokens[i];
+ String value = requestPathTokens[i];
+
+ if (key.length() > 0 && key.charAt(0) == ':') {
+ // This is a placeholder
+ params.put(key.substring(1), value);
+ } else if (!key.equals(value)) {
+ // This is a constant
+ return false;
+ }
+ }
+
+ // The last :* part
+ StringBuilder b = new StringBuilder(requestPathTokens[tokens.length - 1]);
+ for (int i = tokens.length; i < requestPathTokens.length; i++) {
+ b.append('/');
+ b.append(requestPathTokens[i]);
+ }
+ params.put("*", b.toString());
+
+ return true;
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouteResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouteResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouteResult.java
new file mode 100644
index 0000000..8d75089
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouteResult.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import org.apache.flink.shaded.netty4.io.netty.util.internal.ObjectUtil;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is adopted and simplified code from tv.cntt:netty-router library. For more information check {@link Router}.
+ * Original code:
+ * https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/RouteResult.java
+ *
+ * <p>Result of calling {@link Router#route(HttpMethod, String)}.
+ */
+public class RouteResult<T> {
+ private final String uri;
+ private final String decodedPath;
+
+ private final Map<String, String> pathParams;
+ private final Map<String, List<String>> queryParams;
+
+ private final T target;
+
+ /**
+ * The maps will be wrapped in Collections.unmodifiableMap.
+ */
+ public RouteResult(
+ String uri, String decodedPath,
+ Map<String, String> pathParams, Map<String, List<String>> queryParams,
+ T target
+ ) {
+ this.uri = ObjectUtil.checkNotNull(uri, "uri");
+ this.decodedPath = ObjectUtil.checkNotNull(decodedPath, "decodedPath");
+ this.pathParams = Collections.unmodifiableMap(ObjectUtil.checkNotNull(pathParams, "pathParams"));
+ this.queryParams = Collections.unmodifiableMap(ObjectUtil.checkNotNull(queryParams, "queryParams"));
+ this.target = ObjectUtil.checkNotNull(target, "target");
+ }
+
+ /**
+ * Returns the original request URI.
+ */
+ public String uri() {
+ return uri;
+ }
+
+ /**
+ * Returns the decoded request path.
+ */
+ public String decodedPath() {
+ return decodedPath;
+ }
+
+ /**
+ * Returns all params embedded in the request path.
+ */
+ public Map<String, String> pathParams() {
+ return pathParams;
+ }
+
+ /**
+ * Returns all params in the query part of the request URI.
+ */
+ public Map<String, List<String>> queryParams() {
+ return queryParams;
+ }
+
+ public T target() {
+ return target;
+ }
+
+ //----------------------------------------------------------------------------
+ // Utilities to get params.
+
+ /**
+ * Extracts the first matching param in {@code queryParams}.
+ *
+ * @return {@code null} if there's no match
+ */
+ public String queryParam(String name) {
+ List<String> values = queryParams.get(name);
+ return (values == null) ? null : values.get(0);
+ }
+
+ /**
+ * Extracts the param in {@code pathParams} first, then falls back to the first matching
+ * param in {@code queryParams}.
+ *
+ * @return {@code null} if there's no match
+ */
+ public String param(String name) {
+ String pathValue = pathParams.get(name);
+ return (pathValue == null) ? queryParam(name) : pathValue;
+ }
+
+ /**
+ * Extracts all params in {@code pathParams} and {@code queryParams} matching the name.
+ *
+ * @return Unmodifiable list; the list is empty if there's no match
+ */
+ public List<String> params(String name) {
+ List<String> values = queryParams.get(name);
+ String value = pathParams.get(name);
+
+ if (values == null) {
+ return (value == null) ? Collections.<String>emptyList() : Collections.singletonList(value);
+ }
+
+ if (value == null) {
+ return Collections.unmodifiableList(values);
+ } else {
+ List<String> aggregated = new ArrayList<String>(values.size() + 1);
+ aggregated.addAll(values);
+ aggregated.add(value);
+ return Collections.unmodifiableList(aggregated);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java
new file mode 100644
index 0000000..96f5a3e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCounted;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class for handling {@link HttpRequest} with associated {@link RouteResult}.
+ */
+public class RoutedRequest<T> implements ReferenceCounted {
+ private final RouteResult<T> result;
+ private final HttpRequest request;
+
+ private final Optional<ReferenceCounted> requestAsReferenceCounted;
+ private final QueryStringDecoder queryStringDecoder;
+
+ public RoutedRequest(RouteResult<T> result, HttpRequest request) {
+ this.result = checkNotNull(result);
+ this.request = checkNotNull(request);
+ this.requestAsReferenceCounted = Optional.ofNullable((request instanceof ReferenceCounted) ? (ReferenceCounted) request : null);
+ this.queryStringDecoder = new QueryStringDecoder(request.getUri());
+ }
+
+ public RouteResult<T> getRouteResult() {
+ return result;
+ }
+
+ public HttpRequest getRequest() {
+ return request;
+ }
+
+ public String getPath() {
+ return queryStringDecoder.path();
+ }
+
+ @Override
+ public int refCnt() {
+ if (requestAsReferenceCounted.isPresent()) {
+ return requestAsReferenceCounted.get().refCnt();
+ }
+ return 0;
+ }
+
+ @Override
+ public boolean release() {
+ if (requestAsReferenceCounted.isPresent()) {
+ return requestAsReferenceCounted.get().release();
+ }
+ return true;
+ }
+
+ @Override
+ public boolean release(int arg0) {
+ if (requestAsReferenceCounted.isPresent()) {
+ return requestAsReferenceCounted.get().release(arg0);
+ }
+ return true;
+ }
+
+ @Override
+ public ReferenceCounted retain() {
+ if (requestAsReferenceCounted.isPresent()) {
+ requestAsReferenceCounted.get().retain();
+ }
+ return this;
+ }
+
+ @Override
+ public ReferenceCounted retain(int arg0) {
+ if (requestAsReferenceCounted.isPresent()) {
+ requestAsReferenceCounted.get().retain(arg0);
+ }
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/Router.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/Router.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/Router.java
new file mode 100644
index 0000000..aa163fb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/Router.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+/**
+ * This is adopted and simplified code from tv.cntt:netty-router library. Compared to original version this one
+ * defines and guarantees an order of pattern matching routes, drops reverse routing feature and restores
+ * {@link RouterHandler} which was dropped in tv.cntt:netty-router 2.X.X. Original code:
+ * https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/Router.java
+ *
+ * <p>Router that contains information about both route matching orders and
+ * HTTP request methods.
+ *
+ * <p>Routes are guaranteed to be matched in order of their addition.
+ *
+ * <p>Route targets can be any type. In the below example, targets are classes:
+ *
+ * <pre>
+ * {@code
+ * Router<Class> router = new Router<Class>()
+ * .GET ("/articles", IndexHandler.class)
+ * .GET ("/articles/:id", ShowHandler.class)
+ * .POST ("/articles", CreateHandler.class)
+ * .GET ("/download/:*", DownloadHandler.class) // ":*" must be the last token
+ * .GET_FIRST("/articles/new", NewHandler.class); // This will be matched first
+ * }
+ * </pre>
+ *
+ * <p>Slashes at both ends are ignored. These are the same:
+ *
+ * <pre>
+ * {@code
+ * router.GET("articles", IndexHandler.class);
+ * router.GET("/articles", IndexHandler.class);
+ * router.GET("/articles/", IndexHandler.class);
+ * }
+ * </pre>
+ *
+ * <p>You can remove routes by target or by path pattern:
+ *
+ * <pre>
+ * {@code
+ * router.removePathPattern("/articles");
+ * }
+ * </pre>
+ *
+ * <p>To match requests use {@link #route(HttpMethod, String)}.
+ *
+ * <p>From the {@link RouteResult} you can extract params embedded in
+ * the path and from the query part of the request URI.
+ *
+ * <p>{@link #notFound(Object)} will be used as the target when there's no match.
+ *
+ * <pre>
+ * {@code
+ * router.notFound(My404Handler.class);
+ * }
+ * </pre>
+ */
+public class Router<T> {
+ private final Map<HttpMethod, MethodlessRouter<T>> routers =
+ new HashMap<HttpMethod, MethodlessRouter<T>>();
+
+ private final MethodlessRouter<T> anyMethodRouter =
+ new MethodlessRouter<T>();
+
+ private T notFound;
+
+ //--------------------------------------------------------------------------
+ // Design decision:
+ // We do not allow access to routers and anyMethodRouter, because we don't
+ // want to expose MethodlessRouter, OrderlessRouter, and PathPattern.
+ // Exposing those will complicate the use of this package.
+
+ /**
+ * Helper for toString.
+ */
+ private static <T> void aggregateRoutes(
+ String method, Map<PathPattern, T> routes,
+ List<String> accMethods, List<String> accPatterns, List<String> accTargets) {
+ for (Map.Entry<PathPattern, T> entry : routes.entrySet()) {
+ accMethods.add(method);
+ accPatterns.add("/" + entry.getKey().pattern());
+ accTargets.add(targetToString(entry.getValue()));
+ }
+ }
+
+ /**
+ * Helper for toString.
+ */
+ private static int maxLength(List<String> coll) {
+ int max = 0;
+ for (String e : coll) {
+ int length = e.length();
+ if (length > max) {
+ max = length;
+ }
+ }
+ return max;
+ }
+
+ /**
+ * Helper for toString.
+ *
+ * <p>For example, returns
+ * "io.netty.example.http.router.HttpRouterServerHandler" instead of
+ * "class io.netty.example.http.router.HttpRouterServerHandler"
+ */
+ private static String targetToString(Object target) {
+ if (target instanceof Class) {
+ return ((Class<?>) target).getName();
+ } else {
+ return target.toString();
+ }
+ }
+
+ /**
+ * Returns the fallback target for use when there's no match at
+ * {@link #route(HttpMethod, String)}.
+ */
+ public T notFound() {
+ return notFound;
+ }
+
+ /**
+ * Returns the number of routes in this router.
+ */
+ public int size() {
+ int ret = anyMethodRouter.size();
+
+ for (MethodlessRouter<T> router : routers.values()) {
+ ret += router.size();
+ }
+
+ return ret;
+ }
+
+ //--------------------------------------------------------------------------
+
+ /**
+ * Add route.
+ *
+ * <p>A path pattern can only point to one target. This method does nothing if the pattern
+ * has already been added.
+ */
+ public Router<T> addRoute(HttpMethod method, String pathPattern, T target) {
+ getMethodlessRouter(method).addRoute(pathPattern, target);
+ return this;
+ }
+
+ //--------------------------------------------------------------------------
+
+ /**
+ * Sets the fallback target for use when there's no match at
+ * {@link #route(HttpMethod, String)}.
+ */
+ public Router<T> notFound(T target) {
+ this.notFound = target;
+ return this;
+ }
+
+ private MethodlessRouter<T> getMethodlessRouter(HttpMethod method) {
+ if (method == null) {
+ return anyMethodRouter;
+ }
+
+ MethodlessRouter<T> router = routers.get(method);
+ if (router == null) {
+ router = new MethodlessRouter<T>();
+ routers.put(method, router);
+ }
+
+ return router;
+ }
+
+ /**
+ * Removes the route specified by the path pattern.
+ */
+ public void removePathPattern(String pathPattern) {
+ for (MethodlessRouter<T> router : routers.values()) {
+ router.removePathPattern(pathPattern);
+ }
+ anyMethodRouter.removePathPattern(pathPattern);
+ }
+
+ /**
+ * If there's no match, returns the result with {@link #notFound(Object) notFound}
+ * as the target if it is set, otherwise returns {@code null}.
+ */
+ public RouteResult<T> route(HttpMethod method, String path) {
+ return route(method, path, Collections.emptyMap());
+ }
+
+ public RouteResult<T> route(HttpMethod method, String path, Map<String, List<String>> queryParameters) {
+ MethodlessRouter<T> router = routers.get(method);
+ if (router == null) {
+ router = anyMethodRouter;
+ }
+
+ String[] tokens = decodePathTokens(path);
+
+ RouteResult<T> ret = router.route(path, path, queryParameters, tokens);
+ if (ret != null) {
+ return new RouteResult<T>(path, path, ret.pathParams(), queryParameters, ret.target());
+ }
+
+ if (router != anyMethodRouter) {
+ ret = anyMethodRouter.route(path, path, queryParameters, tokens);
+ if (ret != null) {
+ return new RouteResult<T>(path, path, ret.pathParams(), queryParameters, ret.target());
+ }
+ }
+
+ if (notFound != null) {
+ return new RouteResult<T>(path, path, Collections.<String, String>emptyMap(), queryParameters, notFound);
+ }
+
+ return null;
+ }
+
+ //--------------------------------------------------------------------------
+
+ private String[] decodePathTokens(String uri) {
+ // Need to split the original URI (instead of QueryStringDecoder#path) then decode the tokens (components),
+ // otherwise /test1/123%2F456 will not match /test1/:p1
+
+ int qPos = uri.indexOf("?");
+ String encodedPath = (qPos >= 0) ? uri.substring(0, qPos) : uri;
+
+ String[] encodedTokens = PathPattern.removeSlashesAtBothEnds(encodedPath).split("/");
+
+ String[] decodedTokens = new String[encodedTokens.length];
+ for (int i = 0; i < encodedTokens.length; i++) {
+ String encodedToken = encodedTokens[i];
+ decodedTokens[i] = QueryStringDecoder.decodeComponent(encodedToken);
+ }
+
+ return decodedTokens;
+ }
+
+ /**
+ * Returns allowed methods for a specific URI.
+ *
+ * <p>For {@code OPTIONS *}, use {@link #allAllowedMethods()} instead of this method.
+ */
+ public Set<HttpMethod> allowedMethods(String uri) {
+ QueryStringDecoder decoder = new QueryStringDecoder(uri);
+ String[] tokens = PathPattern.removeSlashesAtBothEnds(decoder.path()).split("/");
+
+ if (anyMethodRouter.anyMatched(tokens)) {
+ return allAllowedMethods();
+ }
+
+ Set<HttpMethod> ret = new HashSet<HttpMethod>(routers.size());
+ for (Map.Entry<HttpMethod, MethodlessRouter<T>> entry : routers.entrySet()) {
+ MethodlessRouter<T> router = entry.getValue();
+ if (router.anyMatched(tokens)) {
+ HttpMethod method = entry.getKey();
+ ret.add(method);
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * Returns all methods that this router handles. For {@code OPTIONS *}.
+ */
+ public Set<HttpMethod> allAllowedMethods() {
+ if (anyMethodRouter.size() > 0) {
+ Set<HttpMethod> ret = new HashSet<HttpMethod>(9);
+ ret.add(HttpMethod.CONNECT);
+ ret.add(HttpMethod.DELETE);
+ ret.add(HttpMethod.GET);
+ ret.add(HttpMethod.HEAD);
+ ret.add(HttpMethod.OPTIONS);
+ ret.add(HttpMethod.PATCH);
+ ret.add(HttpMethod.POST);
+ ret.add(HttpMethod.PUT);
+ ret.add(HttpMethod.TRACE);
+ return ret;
+ } else {
+ return new HashSet<HttpMethod>(routers.keySet());
+ }
+ }
+
+ /**
+ * Returns visualized routing rules.
+ */
+ @Override
+ public String toString() {
+ // Step 1/2: Dump routers and anyMethodRouter in order
+ int numRoutes = size();
+ List<String> methods = new ArrayList<String>(numRoutes);
+ List<String> patterns = new ArrayList<String>(numRoutes);
+ List<String> targets = new ArrayList<String>(numRoutes);
+
+ // For router
+ for (Entry<HttpMethod, MethodlessRouter<T>> e : routers.entrySet()) {
+ HttpMethod method = e.getKey();
+ MethodlessRouter<T> router = e.getValue();
+ aggregateRoutes(method.toString(), router.routes(), methods, patterns, targets);
+ }
+
+ // For anyMethodRouter
+ aggregateRoutes("*", anyMethodRouter.routes(), methods, patterns, targets);
+
+ // For notFound
+ if (notFound != null) {
+ methods.add("*");
+ patterns.add("*");
+ targets.add(targetToString(notFound));
+ }
+
+ // Step 2/2: Format the List into aligned columns: <method> <patterns> <target>
+ int maxLengthMethod = maxLength(methods);
+ int maxLengthPattern = maxLength(patterns);
+ String format = "%-" + maxLengthMethod + "s %-" + maxLengthPattern + "s %s\n";
+ int initialCapacity = (maxLengthMethod + 1 + maxLengthPattern + 1 + 20) * methods.size();
+ StringBuilder b = new StringBuilder(initialCapacity);
+ for (int i = 0; i < methods.size(); i++) {
+ String method = methods.get(i);
+ String pattern = patterns.get(i);
+ String target = targets.get(i);
+ b.append(String.format(format, method, pattern, target));
+ }
+ return b.toString();
+ }
+
+ //--------------------------------------------------------------------------
+
+ public Router<T> addConnect(String path, T target) {
+ return addRoute(HttpMethod.CONNECT, path, target);
+ }
+
+ public Router<T> addDelete(String path, T target) {
+ return addRoute(HttpMethod.DELETE, path, target);
+ }
+
+ public Router<T> addGet(String path, T target) {
+ return addRoute(HttpMethod.GET, path, target);
+ }
+
+ public Router<T> addHead(String path, T target) {
+ return addRoute(HttpMethod.HEAD, path, target);
+ }
+
+ public Router<T> addOptions(String path, T target) {
+ return addRoute(HttpMethod.OPTIONS, path, target);
+ }
+
+ public Router<T> addPatch(String path, T target) {
+ return addRoute(HttpMethod.PATCH, path, target);
+ }
+
+ public Router<T> addPost(String path, T target) {
+ return addRoute(HttpMethod.POST, path, target);
+ }
+
+ public Router<T> addPut(String path, T target) {
+ return addRoute(HttpMethod.PUT, path, target);
+ }
+
+ public Router<T> addTrace(String path, T target) {
+ return addRoute(HttpMethod.TRACE, path, target);
+ }
+
+ public Router<T> addAny(String path, T target) {
+ return addRoute(null, path, target);
+ }
+}