You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/08/26 14:40:38 UTC
[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...
GitHub user tillrohrmann opened a pull request:
https://github.com/apache/flink/pull/4597
[FLINK-7527] [rest] Let AbstractRestHandler extend RedirectHandler
## What is the purpose of the change
By letting the `AbstractRestHandler` extend the` RedirectHandler`, we add redirection
capabilities to the `AbstractRestHandler`.
This PR is based on #4551.
## Verifying this change
This change is already covered by existing tests, such as `RedirectHandlerTest`.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tillrohrmann/flink redirectAbstractRestHandler
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4597.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4597
----
commit 7835d0a89c8ecdd5b9661ee8c57a9d63a3ed3742
Author: Till Rohrmann <tr...@apache.org>
Date: 2017-08-14T13:47:04Z
[FLINK-7522] Add termination future to ClusterEntrypoint
The termination future is completed when the ClusterEntrypoint shuts down. This
allows for easier testing.
commit 2cdf97f824bc62a82e65f4c160b9ad64de446de4
Author: Till Rohrmann <tr...@apache.org>
Date: 2017-08-16T12:36:13Z
[FLINK-7457] Make Dispatcher highly available
This commit introduces a dispatcher leader election and retrieval service to the
HighAvailabilityServices. Moreover it adds code such that the Dispatcher now takes
part in the leader election process using the afore-mentioned services.
Let Dispatcher participate in leader election
Add test for Dispatcher leader election
commit 04caf85d33ddfc3a4a9b788745b8282c3437d8e2
Author: Till Rohrmann <tr...@apache.org>
Date: 2017-08-10T08:56:12Z
[FLINK-7409] [web] Make WebRuntimeMonitor reactive
This commit changes the behaviour of the WebRuntimeMonitor to not longer block serving
threads by waiting on the result of futures. Instead the RequestHandler now returns a
CompletableFuture<FullHttpResponse> which is written out to the Netty channel upon
completion. This will improve the performance of our WebRuntimeMonitor.
commit 4fa6dedd95555a2d1a91339ff5effda3bc2bd1d5
Author: Till Rohrmann <tr...@apache.org>
Date: 2017-08-15T10:00:58Z
[FLINK-7458] Generalize GatewayRetriever for WebRuntimeMonitor
Introduce a generalized GatewayRetriever replacing the JobManagerRetriever. The
GatewayRetriever fulfills the same purpose as the JobManagerRetriever with the
ability to retrieve the gateway for an arbitrary endpoint type.
commit 0f9b2ce77e20f25fc95ddeba98f863b86450a72c
Author: Till Rohrmann <tr...@apache.org>
Date: 2017-08-15T11:55:47Z
[FLINK-7459] Generalize Flink's redirection logic
Introduce RedirectHandler which can be extended to add redirection functionality to all
SimpleInboundChannelHandlers. This allows to share the same functionality across the
StaticFileServerHandler and the RuntimeMonitorHandlerBase which could now be removed.
In the future, the AbstractRestHandler will also extend the RedirectHandler.
commit 88aed4f7a198b3994271088b8e19558d399ddd9d
Author: Till Rohrmann <tr...@apache.org>
Date: 2017-08-17T13:04:19Z
[FLINK-7527] [rest] Let AbstractRestHandler extend RedirectHandler
By letting the AbstractRestHandler extend the RedirectHandler, we add redirection
capabilities to the AbstractRestHandler.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4597#discussion_r139618924
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.util;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * Utilities for the REST handlers.
+ */
+public class HandlerUtils {
+
+ private static final ObjectMapper mapper = RestMapperUtils.getStrictObjectMapper();
+
+ /**
+ * Sends the given response and status code to the given channel.
+ *
+ * @param channelHandlerContext identifying the open channel
+ * @param httpRequest originating http request
+ * @param response which should be sent
+ * @param statusCode of the message to send
+ * @param <P> type of the response
+ */
+ public static <P extends ResponseBody> void sendResponse(
+ ChannelHandlerContext channelHandlerContext,
+ HttpRequest httpRequest,
+ P response,
+ HttpResponseStatus statusCode) {
+ StringWriter sw = new StringWriter();
+ try {
+ mapper.writeValue(sw, response);
+ } catch (IOException ioe) {
+ sendErrorResponse(channelHandlerContext, httpRequest, new ErrorResponseBody("Internal server error. Could not map response to JSON."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ return;
+ }
+ sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode);
+ }
+
+ /**
+ * Sends the given error response and status code to the given channel.
+ *
+ * @param channelHandlerContext identifying the open channel
+ * @param httpRequest originating http request
+ * @param errorMessage which should be sent
+ * @param statusCode of the message to send
+ */
+ public static void sendErrorResponse(
+ ChannelHandlerContext channelHandlerContext,
+ HttpRequest httpRequest,
+ ErrorResponseBody errorMessage,
+ HttpResponseStatus statusCode) {
+
+ StringWriter sw = new StringWriter();
+ try {
+ mapper.writeValue(sw, errorMessage);
+ } catch (IOException e) {
+ // this should never happen
+ sendResponse(channelHandlerContext, httpRequest, "Internal server error. Could not map error response to JSON.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ }
+ sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode);
+ }
+
+ /**
+ * Sends the given response and status code to the given channel.
+ * @param channelHandlerContext identifying the open channel
--- End diff --
indentation
---
[GitHub] flink issue #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend Redire...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/4597
Rebased onto the latest master.
---
[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4597#discussion_r139675681
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.util;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * Utilities for the REST handlers.
+ */
+public class HandlerUtils {
+
+ private static final ObjectMapper mapper = RestMapperUtils.getStrictObjectMapper();
+
+ /**
+ * Sends the given response and status code to the given channel.
+ *
+ * @param channelHandlerContext identifying the open channel
+ * @param httpRequest originating http request
+ * @param response which should be sent
+ * @param statusCode of the message to send
+ * @param <P> type of the response
+ */
+ public static <P extends ResponseBody> void sendResponse(
+ ChannelHandlerContext channelHandlerContext,
+ HttpRequest httpRequest,
+ P response,
+ HttpResponseStatus statusCode) {
+ StringWriter sw = new StringWriter();
+ try {
+ mapper.writeValue(sw, response);
+ } catch (IOException ioe) {
+ sendErrorResponse(channelHandlerContext, httpRequest, new ErrorResponseBody("Internal server error. Could not map response to JSON."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ return;
+ }
+ sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode);
+ }
+
+ /**
+ * Sends the given error response and status code to the given channel.
+ *
+ * @param channelHandlerContext identifying the open channel
+ * @param httpRequest originating http request
+ * @param errorMessage which should be sent
+ * @param statusCode of the message to send
+ */
+ public static void sendErrorResponse(
+ ChannelHandlerContext channelHandlerContext,
+ HttpRequest httpRequest,
+ ErrorResponseBody errorMessage,
+ HttpResponseStatus statusCode) {
+
+ StringWriter sw = new StringWriter();
+ try {
+ mapper.writeValue(sw, errorMessage);
+ } catch (IOException e) {
+ // this should never happen
+ sendResponse(channelHandlerContext, httpRequest, "Internal server error. Could not map error response to JSON.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ }
+ sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode);
+ }
+
+ /**
+ * Sends the given response and status code to the given channel.
+ * @param channelHandlerContext identifying the open channel
--- End diff --
good catch. Will fix it.
---
[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4597#discussion_r139676205
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.OptionalConsumer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link SimpleChannelInboundHandler} which encapsulates the redirection logic for the
+ * REST endpoints.
+ *
+ * @param <T> type of the leader to retrieve
+ */
+@ChannelHandler.Sharable
+public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
+
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private final CompletableFuture<String> localAddressFuture;
+
+ protected final GatewayRetriever<T> leaderRetriever;
+
+ protected final Time timeout;
+
+ /** Whether the web service has https enabled. */
+ protected final boolean httpsEnabled;
+
+ private String localAddress;
+
+ protected RedirectHandler(
+ @Nonnull CompletableFuture<String> localAddressFuture,
+ @Nonnull GatewayRetriever<T> leaderRetriever,
+ @Nonnull Time timeout,
+ boolean httpsEnabled) {
+ this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
+ this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
+ this.timeout = Preconditions.checkNotNull(timeout);
+ this.httpsEnabled = httpsEnabled;
+ localAddress = null;
+ }
+
+ @Override
+ protected void channelRead0(
+ ChannelHandlerContext channelHandlerContext,
+ Routed routed) throws Exception {
+
+ try {
+ if (localAddressFuture.isDone()) {
+ if (localAddress == null) {
+ try {
+ localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ logger.error("Could not obtain local address.", e);
+
+ HandlerUtils.sendErrorResponse(
+ channelHandlerContext,
+ routed.request(),
+ new ErrorResponseBody(new FlinkException("Could not obtain local address.", e)),
--- End diff --
Alright. Will remove them.
---
[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/4597
---
[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4597#discussion_r138845374
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.OptionalConsumer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link SimpleChannelInboundHandler} which encapsulates the redirection logic for the
+ * REST endpoints.
+ *
+ * @param <T> type of the leader to retrieve
+ */
+@ChannelHandler.Sharable
+public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
+
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private final CompletableFuture<String> localAddressFuture;
+
+ protected final GatewayRetriever<T> leaderRetriever;
+
+ protected final Time timeout;
+
+ /** Whether the web service has https enabled. */
+ protected final boolean httpsEnabled;
+
+ private String localAddress;
+
+ protected RedirectHandler(
+ @Nonnull CompletableFuture<String> localAddressFuture,
+ @Nonnull GatewayRetriever<T> leaderRetriever,
+ @Nonnull Time timeout,
+ boolean httpsEnabled) {
+ this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
+ this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
+ this.timeout = Preconditions.checkNotNull(timeout);
+ this.httpsEnabled = httpsEnabled;
+ localAddress = null;
+ }
+
+ @Override
+ protected void channelRead0(
+ ChannelHandlerContext channelHandlerContext,
+ Routed routed) throws Exception {
+
+ try {
+ if (localAddressFuture.isDone()) {
+ if (localAddress == null) {
+ try {
+ localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ logger.error("Could not obtain local address.", e);
+
+ HandlerUtils.sendErrorResponse(
+ channelHandlerContext,
+ routed.request(),
+ new ErrorResponseBody(new FlinkException("Could not obtain local address.", e)),
+ HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ OptionalConsumer<T> optLeaderConsumer = OptionalConsumer.of(leaderRetriever.getNow());
+
+ optLeaderConsumer.ifPresent(
+ (T gateway) -> {
+ OptionalConsumer<CompletableFuture<String>> optRedirectAddressConsumer = OptionalConsumer.of(
+ HandlerRedirectUtils.getRedirectAddress(
+ localAddress,
+ gateway,
+ timeout));
+
+ optRedirectAddressConsumer
+ .ifPresent(
+ (CompletableFuture<String> redirectAddressFuture) ->
+ redirectAddressFuture.whenComplete(
--- End diff --
The HTTP redirection seems unnecessary. Why not pass the gateway to the handler in all cases? this would make the handlers act as a proxy to the actual leader which, as I understand, is the behavior we want long-term anyway.
---
[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4597#discussion_r138849512
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.OptionalConsumer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link SimpleChannelInboundHandler} which encapsulates the redirection logic for the
+ * REST endpoints.
+ *
+ * @param <T> type of the leader to retrieve
+ */
+@ChannelHandler.Sharable
+public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
+
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private final CompletableFuture<String> localAddressFuture;
+
+ protected final GatewayRetriever<T> leaderRetriever;
+
+ protected final Time timeout;
+
+ /** Whether the web service has https enabled. */
+ protected final boolean httpsEnabled;
+
+ private String localAddress;
+
+ protected RedirectHandler(
+ @Nonnull CompletableFuture<String> localAddressFuture,
+ @Nonnull GatewayRetriever<T> leaderRetriever,
+ @Nonnull Time timeout,
+ boolean httpsEnabled) {
+ this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
+ this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
+ this.timeout = Preconditions.checkNotNull(timeout);
+ this.httpsEnabled = httpsEnabled;
+ localAddress = null;
+ }
+
+ @Override
+ protected void channelRead0(
+ ChannelHandlerContext channelHandlerContext,
+ Routed routed) throws Exception {
+
+ try {
+ if (localAddressFuture.isDone()) {
+ if (localAddress == null) {
+ try {
+ localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ logger.error("Could not obtain local address.", e);
+
+ HandlerUtils.sendErrorResponse(
+ channelHandlerContext,
+ routed.request(),
+ new ErrorResponseBody(new FlinkException("Could not obtain local address.", e)),
+ HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ OptionalConsumer<T> optLeaderConsumer = OptionalConsumer.of(leaderRetriever.getNow());
+
+ optLeaderConsumer.ifPresent(
+ (T gateway) -> {
+ OptionalConsumer<CompletableFuture<String>> optRedirectAddressConsumer = OptionalConsumer.of(
+ HandlerRedirectUtils.getRedirectAddress(
+ localAddress,
+ gateway,
+ timeout));
+
+ optRedirectAddressConsumer
+ .ifPresent(
+ (CompletableFuture<String> redirectAddressFuture) ->
+ redirectAddressFuture.whenComplete(
--- End diff --
This would be possible if all REST handler related RPCs would return serializable values. Currently, some of the REST handler expect an `ExecutionGraph` as result and thus only work if the rest endpoint runs in the same `ActorSystem` as the `JobManager`. Once this has been changed, we no longer need the HTTP redirect. But right now, this is not the case.
---
[GitHub] flink issue #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend Redire...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/4597
Thanks for the review @zentol. I've addressed your comments. Rebased onto the latest master and if Travis gives green light, I'll merge the PR.
---
[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4597#discussion_r139620406
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.OptionalConsumer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link SimpleChannelInboundHandler} which encapsulates the redirection logic for the
+ * REST endpoints.
+ *
+ * @param <T> type of the leader to retrieve
+ */
+@ChannelHandler.Sharable
+public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
+
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private final CompletableFuture<String> localAddressFuture;
+
+ protected final GatewayRetriever<T> leaderRetriever;
+
+ protected final Time timeout;
+
+ /** Whether the web service has https enabled. */
+ protected final boolean httpsEnabled;
+
+ private String localAddress;
+
+ protected RedirectHandler(
+ @Nonnull CompletableFuture<String> localAddressFuture,
+ @Nonnull GatewayRetriever<T> leaderRetriever,
+ @Nonnull Time timeout,
+ boolean httpsEnabled) {
+ this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
+ this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
+ this.timeout = Preconditions.checkNotNull(timeout);
+ this.httpsEnabled = httpsEnabled;
+ localAddress = null;
+ }
+
+ @Override
+ protected void channelRead0(
+ ChannelHandlerContext channelHandlerContext,
+ Routed routed) throws Exception {
+
+ try {
+ if (localAddressFuture.isDone()) {
+ if (localAddress == null) {
+ try {
+ localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ logger.error("Could not obtain local address.", e);
+
+ HandlerUtils.sendErrorResponse(
+ channelHandlerContext,
+ routed.request(),
+ new ErrorResponseBody(new FlinkException("Could not obtain local address.", e)),
--- End diff --
I would prefer if we did not expose exception stacktraces directly in the API.
---
[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4597#discussion_r139680566
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java ---
@@ -38,6 +40,10 @@ public ErrorResponseBody(String error) {
this(Collections.singletonList(error));
}
+ public ErrorResponseBody(Throwable throwable) {
--- End diff --
This should now be unused.
---
[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4597#discussion_r139805450
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java ---
@@ -38,6 +40,10 @@ public ErrorResponseBody(String error) {
this(Collections.singletonList(error));
}
+ public ErrorResponseBody(Throwable throwable) {
--- End diff --
True. Will remove it.
---
[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4597#discussion_r139624460
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.OptionalConsumer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link SimpleChannelInboundHandler} which encapsulates the redirection logic for the
+ * REST endpoints.
+ *
+ * @param <T> type of the leader to retrieve
+ */
+@ChannelHandler.Sharable
+public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
+
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private final CompletableFuture<String> localAddressFuture;
+
+ protected final GatewayRetriever<T> leaderRetriever;
+
+ protected final Time timeout;
+
+ /** Whether the web service has https enabled. */
+ protected final boolean httpsEnabled;
+
+ private String localAddress;
+
+ protected RedirectHandler(
+ @Nonnull CompletableFuture<String> localAddressFuture,
+ @Nonnull GatewayRetriever<T> leaderRetriever,
+ @Nonnull Time timeout,
+ boolean httpsEnabled) {
+ this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
+ this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
+ this.timeout = Preconditions.checkNotNull(timeout);
+ this.httpsEnabled = httpsEnabled;
+ localAddress = null;
+ }
+
+ @Override
+ protected void channelRead0(
+ ChannelHandlerContext channelHandlerContext,
+ Routed routed) throws Exception {
+
+ try {
+ if (localAddressFuture.isDone()) {
+ if (localAddress == null) {
+ try {
+ localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ logger.error("Could not obtain local address.", e);
+
+ HandlerUtils.sendErrorResponse(
+ channelHandlerContext,
+ routed.request(),
+ new ErrorResponseBody(new FlinkException("Could not obtain local address.", e)),
+ HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ OptionalConsumer<T> optLeaderConsumer = OptionalConsumer.of(leaderRetriever.getNow());
+
+ optLeaderConsumer.ifPresent(
+ (T gateway) -> {
+ OptionalConsumer<CompletableFuture<String>> optRedirectAddressConsumer = OptionalConsumer.of(
+ HandlerRedirectUtils.getRedirectAddress(
+ localAddress,
+ gateway,
+ timeout));
+
+ optRedirectAddressConsumer
+ .ifPresent(
+ (CompletableFuture<String> redirectAddressFuture) ->
+ redirectAddressFuture.whenComplete(
+ (String redirectAddress, Throwable throwable) -> {
+ if (throwable != null) {
+ logger.error("Could not retrieve the redirect address.", throwable);
+
+ HandlerUtils.sendErrorResponse(
+ channelHandlerContext,
+ routed.request(),
+ new ErrorResponseBody(throwable),
--- End diff --
same thing regarding exceptions.
---
[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4597#discussion_r139676227
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.OptionalConsumer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link SimpleChannelInboundHandler} which encapsulates the redirection logic for the
+ * REST endpoints.
+ *
+ * @param <T> type of the leader to retrieve
+ */
+@ChannelHandler.Sharable
+public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
+
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private final CompletableFuture<String> localAddressFuture;
+
+ protected final GatewayRetriever<T> leaderRetriever;
+
+ protected final Time timeout;
+
+ /** Whether the web service has https enabled. */
+ protected final boolean httpsEnabled;
+
+ private String localAddress;
+
+ protected RedirectHandler(
+ @Nonnull CompletableFuture<String> localAddressFuture,
+ @Nonnull GatewayRetriever<T> leaderRetriever,
+ @Nonnull Time timeout,
+ boolean httpsEnabled) {
+ this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
+ this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
+ this.timeout = Preconditions.checkNotNull(timeout);
+ this.httpsEnabled = httpsEnabled;
+ localAddress = null;
+ }
+
+ @Override
+ protected void channelRead0(
+ ChannelHandlerContext channelHandlerContext,
+ Routed routed) throws Exception {
+
+ try {
+ if (localAddressFuture.isDone()) {
+ if (localAddress == null) {
+ try {
+ localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ logger.error("Could not obtain local address.", e);
+
+ HandlerUtils.sendErrorResponse(
+ channelHandlerContext,
+ routed.request(),
+ new ErrorResponseBody(new FlinkException("Could not obtain local address.", e)),
+ HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ OptionalConsumer<T> optLeaderConsumer = OptionalConsumer.of(leaderRetriever.getNow());
+
+ optLeaderConsumer.ifPresent(
+ (T gateway) -> {
+ OptionalConsumer<CompletableFuture<String>> optRedirectAddressConsumer = OptionalConsumer.of(
+ HandlerRedirectUtils.getRedirectAddress(
+ localAddress,
+ gateway,
+ timeout));
+
+ optRedirectAddressConsumer
+ .ifPresent(
+ (CompletableFuture<String> redirectAddressFuture) ->
+ redirectAddressFuture.whenComplete(
+ (String redirectAddress, Throwable throwable) -> {
+ if (throwable != null) {
+ logger.error("Could not retrieve the redirect address.", throwable);
+
+ HandlerUtils.sendErrorResponse(
+ channelHandlerContext,
+ routed.request(),
+ new ErrorResponseBody(throwable),
--- End diff --
Same here.
---