You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/08/26 14:40:38 UTC

[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4597

    [FLINK-7527] [rest] Let AbstractRestHandler extend RedirectHandler

    ## What is the purpose of the change
    
    By letting the `AbstractRestHandler` extend the` RedirectHandler`, we add redirection
    capabilities to the `AbstractRestHandler`.
    
    This PR is based on #4551.
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as `RedirectHandlerTest`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink redirectAbstractRestHandler

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4597.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4597
    
----
commit 7835d0a89c8ecdd5b9661ee8c57a9d63a3ed3742
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-08-14T13:47:04Z

    [FLINK-7522] Add termination future to ClusterEntrypoint
    
    The termination future is completed when the ClusterEntrypoint shuts down. This
    allows for easier testing.

commit 2cdf97f824bc62a82e65f4c160b9ad64de446de4
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-08-16T12:36:13Z

    [FLINK-7457] Make Dispatcher highly available
    
    This commit introduces a dispatcher leader election and retrieval service to the
    HighAvailabilityServices. Moreover it adds code such that the Dispatcher now takes
    part in the leader election process using the afore-mentioned services.
    
    Let Dispatcher participate in leader election
    
    Add test for Dispatcher leader election

commit 04caf85d33ddfc3a4a9b788745b8282c3437d8e2
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-08-10T08:56:12Z

    [FLINK-7409] [web] Make WebRuntimeMonitor reactive
    
    This commit changes the behaviour of the WebRuntimeMonitor to not longer block serving
    threads by waiting on the result of futures. Instead the RequestHandler now returns a
    CompletableFuture<FullHttpResponse> which is written out to the Netty channel upon
    completion. This will improve the performance of our WebRuntimeMonitor.

commit 4fa6dedd95555a2d1a91339ff5effda3bc2bd1d5
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-08-15T10:00:58Z

    [FLINK-7458] Generalize GatewayRetriever for WebRuntimeMonitor
    
    Introduce a generalized GatewayRetriever replacing the JobManagerRetriever. The
    GatewayRetriever fulfills the same purpose as the JobManagerRetriever with the
    ability to retrieve the gateway for an arbitrary endpoint type.

commit 0f9b2ce77e20f25fc95ddeba98f863b86450a72c
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-08-15T11:55:47Z

    [FLINK-7459] Generalize Flink's redirection logic
    
    Introduce RedirectHandler which can be extended to add redirection functionality to all
    SimpleInboundChannelHandlers. This allows to share the same functionality across the
    StaticFileServerHandler and the RuntimeMonitorHandlerBase which could now be removed.
    In the future, the AbstractRestHandler will also extend the RedirectHandler.

commit 88aed4f7a198b3994271088b8e19558d399ddd9d
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-08-17T13:04:19Z

    [FLINK-7527] [rest] Let AbstractRestHandler extend RedirectHandler
    
    By letting the AbstractRestHandler extend the RedirectHandler, we add redirection
    capabilities to the AbstractRestHandler.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4597#discussion_r139618924
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.util;
    +
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +import org.apache.flink.runtime.rest.util.RestMapperUtils;
    +
    +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
    +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +
    +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Utilities for the REST handlers.
    + */
    +public class HandlerUtils {
    +
    +	private static final ObjectMapper mapper = RestMapperUtils.getStrictObjectMapper();
    +
    +	/**
    +	 * Sends the given response and status code to the given channel.
    +	 *
    +	 * @param channelHandlerContext identifying the open channel
    +	 * @param httpRequest originating http request
    +	 * @param response which should be sent
    +	 * @param statusCode of the message to send
    +	 * @param <P> type of the response
    +	 */
    +	public static <P extends ResponseBody> void sendResponse(
    +			ChannelHandlerContext channelHandlerContext,
    +			HttpRequest httpRequest,
    +			P response,
    +			HttpResponseStatus statusCode) {
    +		StringWriter sw = new StringWriter();
    +		try {
    +			mapper.writeValue(sw, response);
    +		} catch (IOException ioe) {
    +			sendErrorResponse(channelHandlerContext, httpRequest, new ErrorResponseBody("Internal server error. Could not map response to JSON."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
    +			return;
    +		}
    +		sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode);
    +	}
    +
    +	/**
    +	 * Sends the given error response and status code to the given channel.
    +	 *
    +	 * @param channelHandlerContext identifying the open channel
    +	 * @param httpRequest originating http request
    +	 * @param errorMessage which should be sent
    +	 * @param statusCode of the message to send
    +	 */
    +	public static void sendErrorResponse(
    +			ChannelHandlerContext channelHandlerContext,
    +			HttpRequest httpRequest,
    +			ErrorResponseBody errorMessage,
    +			HttpResponseStatus statusCode) {
    +
    +		StringWriter sw = new StringWriter();
    +		try {
    +			mapper.writeValue(sw, errorMessage);
    +		} catch (IOException e) {
    +			// this should never happen
    +			sendResponse(channelHandlerContext, httpRequest, "Internal server error. Could not map error response to JSON.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
    +		}
    +		sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode);
    +	}
    +
    +	/**
    +	 * Sends the given response and status code to the given channel.
    +	 *  @param channelHandlerContext identifying the open channel
    --- End diff --
    
    indentation


---

[GitHub] flink issue #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend Redire...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4597
  
    Rebased onto the latest master.


---

[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4597#discussion_r139675681
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.util;
    +
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +import org.apache.flink.runtime.rest.util.RestMapperUtils;
    +
    +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
    +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +
    +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Utilities for the REST handlers.
    + */
    +public class HandlerUtils {
    +
    +	private static final ObjectMapper mapper = RestMapperUtils.getStrictObjectMapper();
    +
    +	/**
    +	 * Sends the given response and status code to the given channel.
    +	 *
    +	 * @param channelHandlerContext identifying the open channel
    +	 * @param httpRequest originating http request
    +	 * @param response which should be sent
    +	 * @param statusCode of the message to send
    +	 * @param <P> type of the response
    +	 */
    +	public static <P extends ResponseBody> void sendResponse(
    +			ChannelHandlerContext channelHandlerContext,
    +			HttpRequest httpRequest,
    +			P response,
    +			HttpResponseStatus statusCode) {
    +		StringWriter sw = new StringWriter();
    +		try {
    +			mapper.writeValue(sw, response);
    +		} catch (IOException ioe) {
    +			sendErrorResponse(channelHandlerContext, httpRequest, new ErrorResponseBody("Internal server error. Could not map response to JSON."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
    +			return;
    +		}
    +		sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode);
    +	}
    +
    +	/**
    +	 * Sends the given error response and status code to the given channel.
    +	 *
    +	 * @param channelHandlerContext identifying the open channel
    +	 * @param httpRequest originating http request
    +	 * @param errorMessage which should be sent
    +	 * @param statusCode of the message to send
    +	 */
    +	public static void sendErrorResponse(
    +			ChannelHandlerContext channelHandlerContext,
    +			HttpRequest httpRequest,
    +			ErrorResponseBody errorMessage,
    +			HttpResponseStatus statusCode) {
    +
    +		StringWriter sw = new StringWriter();
    +		try {
    +			mapper.writeValue(sw, errorMessage);
    +		} catch (IOException e) {
    +			// this should never happen
    +			sendResponse(channelHandlerContext, httpRequest, "Internal server error. Could not map error response to JSON.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
    +		}
    +		sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode);
    +	}
    +
    +	/**
    +	 * Sends the given response and status code to the given channel.
    +	 *  @param channelHandlerContext identifying the open channel
    --- End diff --
    
    good catch. Will fix it.


---

[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4597#discussion_r139676205
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
    +import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
    +import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.OptionalConsumer;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
    +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * {@link SimpleChannelInboundHandler} which encapsulates the redirection logic for the
    + * REST endpoints.
    + *
    + * @param <T> type of the leader to retrieve
    + */
    +@ChannelHandler.Sharable
    +public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
    +
    +	protected final Logger logger = LoggerFactory.getLogger(getClass());
    +
    +	private final CompletableFuture<String> localAddressFuture;
    +
    +	protected final GatewayRetriever<T> leaderRetriever;
    +
    +	protected final Time timeout;
    +
    +	/** Whether the web service has https enabled. */
    +	protected final boolean httpsEnabled;
    +
    +	private String localAddress;
    +
    +	protected RedirectHandler(
    +			@Nonnull CompletableFuture<String> localAddressFuture,
    +			@Nonnull GatewayRetriever<T> leaderRetriever,
    +			@Nonnull Time timeout,
    +			boolean httpsEnabled) {
    +		this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
    +		this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
    +		this.timeout = Preconditions.checkNotNull(timeout);
    +		this.httpsEnabled = httpsEnabled;
    +		localAddress = null;
    +	}
    +
    +	@Override
    +	protected void channelRead0(
    +		ChannelHandlerContext channelHandlerContext,
    +		Routed routed) throws Exception {
    +
    +		try {
    +			if (localAddressFuture.isDone()) {
    +				if (localAddress == null) {
    +					try {
    +						localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    +					} catch (Exception e) {
    +						logger.error("Could not obtain local address.", e);
    +
    +						HandlerUtils.sendErrorResponse(
    +							channelHandlerContext,
    +							routed.request(),
    +							new ErrorResponseBody(new FlinkException("Could not obtain local address.", e)),
    --- End diff --
    
    Alright. Will remove them.


---

[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4597


---

[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4597#discussion_r138845374
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
    +import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
    +import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.OptionalConsumer;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
    +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * {@link SimpleChannelInboundHandler} which encapsulates the redirection logic for the
    + * REST endpoints.
    + *
    + * @param <T> type of the leader to retrieve
    + */
    +@ChannelHandler.Sharable
    +public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
    +
    +	protected final Logger logger = LoggerFactory.getLogger(getClass());
    +
    +	private final CompletableFuture<String> localAddressFuture;
    +
    +	protected final GatewayRetriever<T> leaderRetriever;
    +
    +	protected final Time timeout;
    +
    +	/** Whether the web service has https enabled. */
    +	protected final boolean httpsEnabled;
    +
    +	private String localAddress;
    +
    +	protected RedirectHandler(
    +			@Nonnull CompletableFuture<String> localAddressFuture,
    +			@Nonnull GatewayRetriever<T> leaderRetriever,
    +			@Nonnull Time timeout,
    +			boolean httpsEnabled) {
    +		this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
    +		this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
    +		this.timeout = Preconditions.checkNotNull(timeout);
    +		this.httpsEnabled = httpsEnabled;
    +		localAddress = null;
    +	}
    +
    +	@Override
    +	protected void channelRead0(
    +		ChannelHandlerContext channelHandlerContext,
    +		Routed routed) throws Exception {
    +
    +		try {
    +			if (localAddressFuture.isDone()) {
    +				if (localAddress == null) {
    +					try {
    +						localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    +					} catch (Exception e) {
    +						logger.error("Could not obtain local address.", e);
    +
    +						HandlerUtils.sendErrorResponse(
    +							channelHandlerContext,
    +							routed.request(),
    +							new ErrorResponseBody(new FlinkException("Could not obtain local address.", e)),
    +							HttpResponseStatus.INTERNAL_SERVER_ERROR);
    +					}
    +				}
    +
    +				OptionalConsumer<T> optLeaderConsumer = OptionalConsumer.of(leaderRetriever.getNow());
    +
    +				optLeaderConsumer.ifPresent(
    +					(T gateway) -> {
    +						OptionalConsumer<CompletableFuture<String>> optRedirectAddressConsumer = OptionalConsumer.of(
    +							HandlerRedirectUtils.getRedirectAddress(
    +								localAddress,
    +								gateway,
    +								timeout));
    +
    +						optRedirectAddressConsumer
    +							.ifPresent(
    +								(CompletableFuture<String> redirectAddressFuture) ->
    +									redirectAddressFuture.whenComplete(
    --- End diff --
    
    The HTTP redirection seems unnecessary. Why not pass the gateway to the handler in all cases? this would make the handlers act as a proxy to the actual leader which, as I understand, is the behavior we want long-term anyway.


---

[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4597#discussion_r138849512
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
    +import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
    +import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.OptionalConsumer;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
    +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * {@link SimpleChannelInboundHandler} which encapsulates the redirection logic for the
    + * REST endpoints.
    + *
    + * @param <T> type of the leader to retrieve
    + */
    +@ChannelHandler.Sharable
    +public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
    +
    +	protected final Logger logger = LoggerFactory.getLogger(getClass());
    +
    +	private final CompletableFuture<String> localAddressFuture;
    +
    +	protected final GatewayRetriever<T> leaderRetriever;
    +
    +	protected final Time timeout;
    +
    +	/** Whether the web service has https enabled. */
    +	protected final boolean httpsEnabled;
    +
    +	private String localAddress;
    +
    +	protected RedirectHandler(
    +			@Nonnull CompletableFuture<String> localAddressFuture,
    +			@Nonnull GatewayRetriever<T> leaderRetriever,
    +			@Nonnull Time timeout,
    +			boolean httpsEnabled) {
    +		this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
    +		this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
    +		this.timeout = Preconditions.checkNotNull(timeout);
    +		this.httpsEnabled = httpsEnabled;
    +		localAddress = null;
    +	}
    +
    +	@Override
    +	protected void channelRead0(
    +		ChannelHandlerContext channelHandlerContext,
    +		Routed routed) throws Exception {
    +
    +		try {
    +			if (localAddressFuture.isDone()) {
    +				if (localAddress == null) {
    +					try {
    +						localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    +					} catch (Exception e) {
    +						logger.error("Could not obtain local address.", e);
    +
    +						HandlerUtils.sendErrorResponse(
    +							channelHandlerContext,
    +							routed.request(),
    +							new ErrorResponseBody(new FlinkException("Could not obtain local address.", e)),
    +							HttpResponseStatus.INTERNAL_SERVER_ERROR);
    +					}
    +				}
    +
    +				OptionalConsumer<T> optLeaderConsumer = OptionalConsumer.of(leaderRetriever.getNow());
    +
    +				optLeaderConsumer.ifPresent(
    +					(T gateway) -> {
    +						OptionalConsumer<CompletableFuture<String>> optRedirectAddressConsumer = OptionalConsumer.of(
    +							HandlerRedirectUtils.getRedirectAddress(
    +								localAddress,
    +								gateway,
    +								timeout));
    +
    +						optRedirectAddressConsumer
    +							.ifPresent(
    +								(CompletableFuture<String> redirectAddressFuture) ->
    +									redirectAddressFuture.whenComplete(
    --- End diff --
    
    This would be possible if all REST handler related RPCs would return serializable values. Currently, some of the REST handler expect an `ExecutionGraph` as result and thus only work if the rest endpoint runs in the same `ActorSystem` as the `JobManager`. Once this has been changed, we no longer need the HTTP redirect. But right now, this is not the case.


---

[GitHub] flink issue #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend Redire...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4597
  
    Thanks for the review @zentol. I've addressed your comments. Rebased onto the latest master and if Travis gives green light, I'll merge the PR.


---

[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4597#discussion_r139620406
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
    +import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
    +import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.OptionalConsumer;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
    +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * {@link SimpleChannelInboundHandler} which encapsulates the redirection logic for the
    + * REST endpoints.
    + *
    + * @param <T> type of the leader to retrieve
    + */
    +@ChannelHandler.Sharable
    +public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
    +
    +	protected final Logger logger = LoggerFactory.getLogger(getClass());
    +
    +	private final CompletableFuture<String> localAddressFuture;
    +
    +	protected final GatewayRetriever<T> leaderRetriever;
    +
    +	protected final Time timeout;
    +
    +	/** Whether the web service has https enabled. */
    +	protected final boolean httpsEnabled;
    +
    +	private String localAddress;
    +
    +	protected RedirectHandler(
    +			@Nonnull CompletableFuture<String> localAddressFuture,
    +			@Nonnull GatewayRetriever<T> leaderRetriever,
    +			@Nonnull Time timeout,
    +			boolean httpsEnabled) {
    +		this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
    +		this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
    +		this.timeout = Preconditions.checkNotNull(timeout);
    +		this.httpsEnabled = httpsEnabled;
    +		localAddress = null;
    +	}
    +
    +	@Override
    +	protected void channelRead0(
    +		ChannelHandlerContext channelHandlerContext,
    +		Routed routed) throws Exception {
    +
    +		try {
    +			if (localAddressFuture.isDone()) {
    +				if (localAddress == null) {
    +					try {
    +						localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    +					} catch (Exception e) {
    +						logger.error("Could not obtain local address.", e);
    +
    +						HandlerUtils.sendErrorResponse(
    +							channelHandlerContext,
    +							routed.request(),
    +							new ErrorResponseBody(new FlinkException("Could not obtain local address.", e)),
    --- End diff --
    
    I would prefer if we did not expose exception stacktraces directly in the API.


---

[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4597#discussion_r139680566
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java ---
    @@ -38,6 +40,10 @@ public ErrorResponseBody(String error) {
     		this(Collections.singletonList(error));
     	}
     
    +	public ErrorResponseBody(Throwable throwable) {
    --- End diff --
    
    This should now be unused.


---

[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4597#discussion_r139805450
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java ---
    @@ -38,6 +40,10 @@ public ErrorResponseBody(String error) {
     		this(Collections.singletonList(error));
     	}
     
    +	public ErrorResponseBody(Throwable throwable) {
    --- End diff --
    
    True. Will remove it.


---

[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4597#discussion_r139624460
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
    +import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
    +import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.OptionalConsumer;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
    +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * {@link SimpleChannelInboundHandler} which encapsulates the redirection logic for the
    + * REST endpoints.
    + *
    + * @param <T> type of the leader to retrieve
    + */
    +@ChannelHandler.Sharable
    +public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
    +
    +	protected final Logger logger = LoggerFactory.getLogger(getClass());
    +
    +	private final CompletableFuture<String> localAddressFuture;
    +
    +	protected final GatewayRetriever<T> leaderRetriever;
    +
    +	protected final Time timeout;
    +
    +	/** Whether the web service has https enabled. */
    +	protected final boolean httpsEnabled;
    +
    +	private String localAddress;
    +
    +	protected RedirectHandler(
    +			@Nonnull CompletableFuture<String> localAddressFuture,
    +			@Nonnull GatewayRetriever<T> leaderRetriever,
    +			@Nonnull Time timeout,
    +			boolean httpsEnabled) {
    +		this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
    +		this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
    +		this.timeout = Preconditions.checkNotNull(timeout);
    +		this.httpsEnabled = httpsEnabled;
    +		localAddress = null;
    +	}
    +
    +	@Override
    +	protected void channelRead0(
    +		ChannelHandlerContext channelHandlerContext,
    +		Routed routed) throws Exception {
    +
    +		try {
    +			if (localAddressFuture.isDone()) {
    +				if (localAddress == null) {
    +					try {
    +						localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    +					} catch (Exception e) {
    +						logger.error("Could not obtain local address.", e);
    +
    +						HandlerUtils.sendErrorResponse(
    +							channelHandlerContext,
    +							routed.request(),
    +							new ErrorResponseBody(new FlinkException("Could not obtain local address.", e)),
    +							HttpResponseStatus.INTERNAL_SERVER_ERROR);
    +					}
    +				}
    +
    +				OptionalConsumer<T> optLeaderConsumer = OptionalConsumer.of(leaderRetriever.getNow());
    +
    +				optLeaderConsumer.ifPresent(
    +					(T gateway) -> {
    +						OptionalConsumer<CompletableFuture<String>> optRedirectAddressConsumer = OptionalConsumer.of(
    +							HandlerRedirectUtils.getRedirectAddress(
    +								localAddress,
    +								gateway,
    +								timeout));
    +
    +						optRedirectAddressConsumer
    +							.ifPresent(
    +								(CompletableFuture<String> redirectAddressFuture) ->
    +									redirectAddressFuture.whenComplete(
    +										(String redirectAddress, Throwable throwable) -> {
    +											if (throwable != null) {
    +												logger.error("Could not retrieve the redirect address.", throwable);
    +
    +												HandlerUtils.sendErrorResponse(
    +													channelHandlerContext,
    +													routed.request(),
    +													new ErrorResponseBody(throwable),
    --- End diff --
    
    same thing regarding exceptions.


---

[GitHub] flink pull request #4597: [FLINK-7527] [rest] Let AbstractRestHandler extend...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4597#discussion_r139676227
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
    +import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
    +import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.OptionalConsumer;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
    +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * {@link SimpleChannelInboundHandler} which encapsulates the redirection logic for the
    + * REST endpoints.
    + *
    + * @param <T> type of the leader to retrieve
    + */
    +@ChannelHandler.Sharable
    +public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
    +
    +	protected final Logger logger = LoggerFactory.getLogger(getClass());
    +
    +	private final CompletableFuture<String> localAddressFuture;
    +
    +	protected final GatewayRetriever<T> leaderRetriever;
    +
    +	protected final Time timeout;
    +
    +	/** Whether the web service has https enabled. */
    +	protected final boolean httpsEnabled;
    +
    +	private String localAddress;
    +
    +	protected RedirectHandler(
    +			@Nonnull CompletableFuture<String> localAddressFuture,
    +			@Nonnull GatewayRetriever<T> leaderRetriever,
    +			@Nonnull Time timeout,
    +			boolean httpsEnabled) {
    +		this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
    +		this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
    +		this.timeout = Preconditions.checkNotNull(timeout);
    +		this.httpsEnabled = httpsEnabled;
    +		localAddress = null;
    +	}
    +
    +	@Override
    +	protected void channelRead0(
    +		ChannelHandlerContext channelHandlerContext,
    +		Routed routed) throws Exception {
    +
    +		try {
    +			if (localAddressFuture.isDone()) {
    +				if (localAddress == null) {
    +					try {
    +						localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    +					} catch (Exception e) {
    +						logger.error("Could not obtain local address.", e);
    +
    +						HandlerUtils.sendErrorResponse(
    +							channelHandlerContext,
    +							routed.request(),
    +							new ErrorResponseBody(new FlinkException("Could not obtain local address.", e)),
    +							HttpResponseStatus.INTERNAL_SERVER_ERROR);
    +					}
    +				}
    +
    +				OptionalConsumer<T> optLeaderConsumer = OptionalConsumer.of(leaderRetriever.getNow());
    +
    +				optLeaderConsumer.ifPresent(
    +					(T gateway) -> {
    +						OptionalConsumer<CompletableFuture<String>> optRedirectAddressConsumer = OptionalConsumer.of(
    +							HandlerRedirectUtils.getRedirectAddress(
    +								localAddress,
    +								gateway,
    +								timeout));
    +
    +						optRedirectAddressConsumer
    +							.ifPresent(
    +								(CompletableFuture<String> redirectAddressFuture) ->
    +									redirectAddressFuture.whenComplete(
    +										(String redirectAddress, Throwable throwable) -> {
    +											if (throwable != null) {
    +												logger.error("Could not retrieve the redirect address.", throwable);
    +
    +												HandlerUtils.sendErrorResponse(
    +													channelHandlerContext,
    +													routed.request(),
    +													new ErrorResponseBody(throwable),
    --- End diff --
    
    Same here.


---