You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by EronWright <gi...@git.apache.org> on 2017/10/04 04:18:42 UTC

[GitHub] flink pull request #4767: [FLINK-7738] [flip-6] Create WebSocket handler (se...

GitHub user EronWright opened a pull request:

    https://github.com/apache/flink/pull/4767

    [FLINK-7738] [flip-6] Create WebSocket handler (server, client)

    
    ## What is the purpose of the change
    
    Introduces WebSocket support for the FLIP-6 REST server and client.
    
    The basic idea is to use the normal REST handler to initiate a websocket upgrade.    In this way, the normal request parsing logic may be used.  For example, a REST method of `/jobs/:jobid/subscribe` may be developed using a normal REST handler.  The handler responds such that the server initiates the upgrade procedure rather than producing a normal REST response.   A new type of handler based on `AbstractWebSocketMessageHandler` is then installed into the pipeline for subsequent interaction.
    
    Netty's `ChannelGroup` is leveraged to act as an event bus to easily dispatch a message to one or more channels based on a routing key.  In the above example, the routing key might be `jobid`, meaning that a given channel is listening to events related to a certain job.   It is expected that a concrete subclass of `RestServerEndpoint` create one or more `KeyedChannelRouter` instances as needed for its handlers, and then write messages as it sees fit.
    
    The client was similarly adapted to open a `WebSocket` with associated listeners.  Consider the work to be a stop-gap pending further discussion.
    
    The `RestEndpointITCase` test was enhanced with an end-to-end demonstration.   A separate unit test for `AbstractRestHandler` was also introduced.
    
    ## Brief change log
    - Introduce `AbstractWebSocketMessageHandler` to handle inbound and outbound websocket messages.
    - Introduce `WebSocketUpgradeResponseBody` as a special REST response that triggers a websocket upgrade.
    - Update `AbstractRestHandler` to handle websocket upgrades.
    - Introduce `KeyedChannelRouter` to route websocket messages to interested channels.
    - Update `RestClient` with a new method, `sendWebSocketRequest`.
    - Introduce `WebSocket` and `WebSocketListener`.
    - Update `RestEndpointITCase` with end-to-end websocket test.
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    - `AbstractRestHandlerTest`
    - `RestEndpointITCase`
    - `AbstractWebSocketMessageHandlerTest`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive):no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/EronWright/flink FLINK-7738-2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4767.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4767
    
----
commit f56168846731ab4205a2b04a42285e0b3a3f1972
Author: Wright, Eron <er...@emc.com>
Date:   2017-10-04T00:26:56Z

    [FLINK-7738] [flip-6] Create WebSocket handler (server, client)

----


---

[GitHub] flink pull request #4767: [FLINK-7738] [flip-6] Create WebSocket handler (se...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4767#discussion_r142712091
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java ---
    @@ -325,6 +406,98 @@ public TestParameters getUnresolvedMessageParameters() {
     		}
     	}
     
    +	private static class TestWebSocketOperation {
    +
    +		private static class WsParameters extends MessageParameters {
    +			private final JobIDPathParameter jobIDPathParameter = new JobIDPathParameter();
    +
    +			@Override
    +			public Collection<MessagePathParameter<?>> getPathParameters() {
    +				return Collections.singleton(jobIDPathParameter);
    +			}
    +
    +			@Override
    +			public Collection<MessageQueryParameter<?>> getQueryParameters() {
    +				return Collections.emptyList();
    +			}
    +		}
    +
    +		static class WsHeaders implements MessageHeaders<EmptyRequestBody, WebSocketUpgradeResponseBody, WsParameters> {
    +
    +			@Override
    +			public HttpMethodWrapper getHttpMethod() {
    +				return HttpMethodWrapper.GET;
    +			}
    +
    +			@Override
    +			public String getTargetRestEndpointURL() {
    +				return "/test/:jobid/subscribe";
    +			}
    +
    +			@Override
    +			public Class<EmptyRequestBody> getRequestClass() {
    +				return EmptyRequestBody.class;
    +			}
    +
    +			@Override
    +			public Class<WebSocketUpgradeResponseBody> getResponseClass() {
    +				return WebSocketUpgradeResponseBody.class;
    +			}
    +
    +			@Override
    +			public HttpResponseStatus getResponseStatusCode() {
    +				return HttpResponseStatus.OK;
    +			}
    +
    +			@Override
    +			public WsParameters getUnresolvedMessageParameters() {
    +				return new WsParameters();
    +			}
    +		}
    +
    +		static class WsRestHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, WebSocketUpgradeResponseBody, WsParameters> {
    +
    +			private final TestEventProvider eventProvider;
    +
    +			WsRestHandler(
    +				CompletableFuture<String> localAddressFuture,
    +				GatewayRetriever<RestfulGateway> leaderRetriever,
    +				TestEventProvider eventProvider,
    +				Time timeout) {
    +				super(localAddressFuture, leaderRetriever, timeout, new WsHeaders());
    +				this.eventProvider = eventProvider;
    +			}
    +
    +			@Override
    +			protected CompletableFuture<WebSocketUpgradeResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, WsParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
    +				JobID jobID = request.getPathParameter(JobIDPathParameter.class);
    +				Assert.assertEquals(PATH_JOB_ID, jobID);
    +				ChannelHandler messageHandler = new WsMessageHandler(eventProvider, jobID);
    --- End diff --
    
    The main value of `AbstractRestHandler` in this scenario is in decoding the HTTP request into a `HandlerRequest`.   By factoring that code into a `MessageToMessageDecoder` we could reuse it and avoid the need for `AbstractRestHandler` in this scenario.
    
    In other words, the 'Netty way' would be to use a pipeline of handlers, which is more flexible than an inheritance hierarchy in my opinion.
    
    Normal operation: `[HttpCodec] -> [RestRequestDecoder] -> [RestHandler]`
    WebSocket operation: `[HttpCodec] -> [RestRequestDecoder] -> [WebSocketHandler]`
    
    We could go further by encapsulating each operation in a handler that simply adds the appropriate child handlers, similar to how `HttpCodec` simply adds an encoder and decoder to the pipeline.   WDYT?
    



---

[GitHub] flink pull request #4767: [FLINK-7738] [flip-6] Create WebSocket handler (se...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4767#discussion_r142633088
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java ---
    @@ -157,7 +160,12 @@ protected void respondAsLeader(final ChannelHandlerContext ctx, Routed routed, T
     						HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
     					}
     				} else {
    -					HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode());
    +					if (resp instanceof WebSocketUpgradeResponseBody) {
    +						upgradeToWebSocket(ctx, routed, (WebSocketUpgradeResponseBody) resp);
    --- End diff --
    
    help me out here. After the upgrade is complete, which parts of the AbstractRestHandler class are still used?


---

[GitHub] flink pull request #4767: [FLINK-7738] [flip-6] Create WebSocket handler (se...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4767#discussion_r142700416
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
    @@ -275,4 +301,121 @@ public HttpResponseStatus getHttpResponseStatus() {
     			return httpResponseStatus;
     		}
     	}
    +
    +	public <M extends MessageHeaders<EmptyRequestBody, WebSocketUpgradeResponseBody, U>, U extends MessageParameters, R extends ResponseBody> CompletableFuture<WebSocket> sendWebSocketRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, Class<R> messageClazz, WebSocketListener... listeners) throws IOException {
    --- End diff --
    
    I too was unhappy about using a special response body, but felt that the alternative required some rework of the REST handler that was best done in a follow-up.  With some rework we can eliminate the funky response body.


---

[GitHub] flink issue #4767: [FLINK-7738] [flip-6] Create WebSocket handler (server, c...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on the issue:

    https://github.com/apache/flink/pull/4767
  
    I don't much like the use of `RequestBody` and `ResponseBody` here, or even that the WebSocket distinguishes between client and server messages.  Honestly a `MessageBody` marker interface may suffice.   WDTY?


---

[GitHub] flink pull request #4767: [FLINK-7738] [flip-6] Create WebSocket handler (se...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4767#discussion_r142641670
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/websocket/WebSocket.java ---
    @@ -0,0 +1,44 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.websocket;
    +
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
    +
    +/**
    + * A WebSocket for sending and receiving messages.
    + */
    +public interface WebSocket {
    +
    +	/**
    +	 * Adds a listener for websocket messages.
    +	 */
    +	void addListener(WebSocketListener listener);
    +
    +	/**
    +	 * Sends a message.
    +	 */
    +	ChannelFuture send(ResponseBody message);
    --- End diff --
    
    My understanding is that the WebSocket interface is only used on the client, so shouldn't this be typed to `RequestBody`?


---

[GitHub] flink pull request #4767: [FLINK-7738] [flip-6] Create WebSocket handler (se...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4767#discussion_r142639095
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/websocket/WebSocket.java ---
    @@ -0,0 +1,44 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.websocket;
    +
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
    +
    +/**
    + * A WebSocket for sending and receiving messages.
    + */
    +public interface WebSocket {
    --- End diff --
    
    similar to the WebSocketListener this should have a type parameters.


---

[GitHub] flink pull request #4767: [FLINK-7738] [flip-6] Create WebSocket handler (se...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4767#discussion_r142634775
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java ---
    @@ -325,6 +406,98 @@ public TestParameters getUnresolvedMessageParameters() {
     		}
     	}
     
    +	private static class TestWebSocketOperation {
    +
    +		private static class WsParameters extends MessageParameters {
    +			private final JobIDPathParameter jobIDPathParameter = new JobIDPathParameter();
    +
    +			@Override
    +			public Collection<MessagePathParameter<?>> getPathParameters() {
    +				return Collections.singleton(jobIDPathParameter);
    +			}
    +
    +			@Override
    +			public Collection<MessageQueryParameter<?>> getQueryParameters() {
    +				return Collections.emptyList();
    +			}
    +		}
    +
    +		static class WsHeaders implements MessageHeaders<EmptyRequestBody, WebSocketUpgradeResponseBody, WsParameters> {
    +
    +			@Override
    +			public HttpMethodWrapper getHttpMethod() {
    +				return HttpMethodWrapper.GET;
    +			}
    +
    +			@Override
    +			public String getTargetRestEndpointURL() {
    +				return "/test/:jobid/subscribe";
    +			}
    +
    +			@Override
    +			public Class<EmptyRequestBody> getRequestClass() {
    +				return EmptyRequestBody.class;
    +			}
    +
    +			@Override
    +			public Class<WebSocketUpgradeResponseBody> getResponseClass() {
    +				return WebSocketUpgradeResponseBody.class;
    +			}
    +
    +			@Override
    +			public HttpResponseStatus getResponseStatusCode() {
    +				return HttpResponseStatus.OK;
    +			}
    +
    +			@Override
    +			public WsParameters getUnresolvedMessageParameters() {
    +				return new WsParameters();
    +			}
    +		}
    +
    +		static class WsRestHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, WebSocketUpgradeResponseBody, WsParameters> {
    +
    +			private final TestEventProvider eventProvider;
    +
    +			WsRestHandler(
    +				CompletableFuture<String> localAddressFuture,
    +				GatewayRetriever<RestfulGateway> leaderRetriever,
    +				TestEventProvider eventProvider,
    +				Time timeout) {
    +				super(localAddressFuture, leaderRetriever, timeout, new WsHeaders());
    +				this.eventProvider = eventProvider;
    +			}
    +
    +			@Override
    +			protected CompletableFuture<WebSocketUpgradeResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, WsParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
    +				JobID jobID = request.getPathParameter(JobIDPathParameter.class);
    +				Assert.assertEquals(PATH_JOB_ID, jobID);
    +				ChannelHandler messageHandler = new WsMessageHandler(eventProvider, jobID);
    --- End diff --
    
    if this is how a AbstractRestHandler implementation for WebSockets would actually look like I'm questioning the benefit of implementing it as a AbstractRestHandler in the first place.
    
    An explicit AbstractWebSocketRestHandler class could have an abstract `initializeWebSocket(HandlerRequest ...)` method instead of hacking it into `AbstractRestHandler#handleRequest` and a separate method for creating the event provider that is types to the actual response we're sending back.


---

[GitHub] flink pull request #4767: [FLINK-7738] [flip-6] Create WebSocket handler (se...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4767#discussion_r142696121
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java ---
    @@ -157,7 +160,12 @@ protected void respondAsLeader(final ChannelHandlerContext ctx, Routed routed, T
     						HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
     					}
     				} else {
    -					HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode());
    +					if (resp instanceof WebSocketUpgradeResponseBody) {
    +						upgradeToWebSocket(ctx, routed, (WebSocketUpgradeResponseBody) resp);
    --- End diff --
    
    The REST handler is not active after the upgrade is complete, and it would be harmless to remove from the pipeline.  The message handler takes over, reading and writing websocket frames based on typed messages.


---

[GitHub] flink issue #4767: [FLINK-7738] [flip-6] Create WebSocket handler (server, c...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on the issue:

    https://github.com/apache/flink/pull/4767
  
    @tillrohrmann do you need a websocket yet?


---

[GitHub] flink pull request #4767: [FLINK-7738] [flip-6] Create WebSocket handler (se...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4767#discussion_r142634073
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
    @@ -275,4 +301,121 @@ public HttpResponseStatus getHttpResponseStatus() {
     			return httpResponseStatus;
     		}
     	}
    +
    +	public <M extends MessageHeaders<EmptyRequestBody, WebSocketUpgradeResponseBody, U>, U extends MessageParameters, R extends ResponseBody> CompletableFuture<WebSocket> sendWebSocketRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, Class<R> messageClazz, WebSocketListener... listeners) throws IOException {
    --- End diff --
    
    I really dislike how the `WebSocketUpgradeResponseBody` is defined as the response in the headers. Not only is this not the actual response we're getting back (that would be R), we now also introduce an arbitrary response type, which voids the type safety and prevents us from auto-generating proper documentation.
    
    The MessageHeaders are very much a high-level user-facing specification, but here we're using it for the setup of the websockets which is a pretty relatively low-level affair.


---

[GitHub] flink issue #4767: [FLINK-7738] [flip-6] Create WebSocket handler (server, c...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4767
  
    Hi @EronWright, yes I think we still need support for web sockets. The first REST based client won't use this but later on we should definitely add this functionality. At the moment we try hard to make Flip-6 feature equivalent to the old distributed architecture and therefore we couldn't make progress here. But once this has been done, we should re-iterate over this PR again.


---

[GitHub] flink pull request #4767: [FLINK-7738] [flip-6] Create WebSocket handler (se...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4767#discussion_r144381682
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractWebSocketHandler.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
    +import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
    +import org.apache.flink.runtime.rest.messages.MessageParameters;
    +import org.apache.flink.runtime.rest.messages.RequestBody;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +import org.apache.flink.runtime.rest.messages.WebSocketSpecification;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    +import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
    +import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.util.concurrent.CompletableFuture;
    +
    +/**
    + * A channel handler for WebSocket resources.
    + *
    + * <p>This handler handles handshaking and ongoing messaging with a WebSocket client,
    + * based on a {@link WebSocketSpecification} that describes the REST resource location,
    + * parameter type, and message inbound/outbound types.  Messages are automatically
    + * encoded from (and decoded to) JSON text.
    + *
    + * <p>Subclasses should override the following methods to extend the respective phases.
    + * <ol>
    + *     <li>{@code handshakeInitiated} - occurs upon receipt of a handshake request from an HTTP client.  Useful for parameter validation.</li>
    + *     <li>{@code handshakeCompleted} - occurs upon successful completion; WebSocket is ready for I/O.</li>
    + *     <li>{@code messageReceived}: occurs when a WebSocket message is received on the channel.</li>
    + * </ol>
    + *
    + * <p>The handler supports gateway availability announcements.
    + *
    + * @param <T> The gateway type.
    + * @param <M> The REST parameter type.
    + * @param <O> The outbound message type.
    + * @param <I> The inbound message type.
    + */
    +public abstract class AbstractWebSocketHandler<T extends RestfulGateway, M extends MessageParameters, O extends RequestBody, I extends ResponseBody> extends ChannelInboundHandlerAdapter {
    +
    +	protected final Logger log = LoggerFactory.getLogger(getClass());
    +
    +	private final RedirectHandler redirectHandler;
    +
    +	private final AttributeKey<T> gatewayAttr;
    +
    +	private final WebSocketSpecification<M, O, I> specification;
    +
    +	private final ChannelHandler messageCodec;
    +
    +	private final AttributeKey<M> parametersAttr;
    +
    +	/**
    +	 * Creates a new handler.
    +	 */
    +	public AbstractWebSocketHandler(
    +		@Nonnull CompletableFuture<String> localAddressFuture,
    +		@Nonnull GatewayRetriever<? extends T> leaderRetriever,
    +		@Nonnull Time timeout,
    +		@Nonnull WebSocketSpecification<M, O, I> specification) {
    +		this.redirectHandler = new RedirectHandler<>(localAddressFuture, leaderRetriever, timeout);
    +		this.gatewayAttr = AttributeKey.valueOf("gateway");
    +		this.specification = specification;
    +		this.messageCodec = new JsonWebSocketMessageCodec<>(specification.getInboundClass(), specification.getOutboundClass());
    +		this.parametersAttr = AttributeKey.valueOf("parameters");
    +	}
    +
    +	/**
    +	 * Sets the gateway associated with the channel.
    +	 */
    +	private void setGateway(ChannelHandlerContext ctx, T gateway) {
    +		ctx.attr(gatewayAttr).set(gateway);
    +	}
    +
    +	/**
    +	 * Returns the gateway associated with the channel.
    +	 */
    +	public T getGateway(ChannelHandlerContext ctx) {
    +		T t = ctx.attr(gatewayAttr).get();
    +		Preconditions.checkState(t != null, "Gateway is not available.");
    +		return t;
    +	}
    +
    +	/**
    +	 * Sets the resource parameters associated with the channel.
    +	 *
    +	 * <p>The parameters are established by the WebSocket handshake request.
    +	 */
    +	private void setMessageParameters(ChannelHandlerContext ctx, M parameters) {
    +		ctx.attr(parametersAttr).set(parameters);
    +	}
    +
    +	/**
    +	 * Returns the resource parameters associated with the channel.
    +	 */
    +	public M getMessageParameters(ChannelHandlerContext ctx) {
    +		M o = ctx.attr(parametersAttr).get();
    +		Preconditions.checkState(o != null, "Message parameters are not available.");
    +		return o;
    +	}
    +
    +	@Override
    +	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    +		if (ctx.pipeline().get(RedirectHandler.class.getName()) == null) {
    +			ctx.pipeline().addBefore(ctx.name(), RedirectHandler.class.getName(), redirectHandler);
    +		}
    +	}
    +
    +	@Override
    +	@SuppressWarnings("unchecked")
    +	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    +		if (evt instanceof RedirectHandler.GatewayRetrieved) {
    +			T gateway = ((RedirectHandler.GatewayRetrieved<T>) evt).getGateway();
    +			setGateway(ctx, gateway);
    +			log.debug("Gateway retrieved: {}", gateway);
    +		}
    +		else if (evt instanceof WebSocketServerProtocolHandler.ServerHandshakeStateEvent) {
    +			WebSocketServerProtocolHandler.ServerHandshakeStateEvent handshakeEvent = (WebSocketServerProtocolHandler.ServerHandshakeStateEvent) evt;
    +			if (handshakeEvent == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
    +				log.debug("Handshake completed with client IP: {}", ctx.channel().remoteAddress());
    +				M parameters = getMessageParameters(ctx);
    +				handshakeCompleted(ctx, parameters);
    +			}
    +		}
    +
    +		super.userEventTriggered(ctx, evt);
    +	}
    +
    +	@Override
    +	@SuppressWarnings("unchecked")
    +	public void channelRead(ChannelHandlerContext ctx, Object o) throws Exception {
    +		if (specification.getInboundClass().isAssignableFrom(o.getClass())) {
    +			// process an inbound message
    +			M parameters = getMessageParameters(ctx);
    +			messageReceived(ctx, parameters, (O) o);
    +			return;
    +		}
    +
    +		if (!(o instanceof Routed)) {
    +			// a foreign message
    +			ctx.fireChannelRead(o);
    +			return;
    +		}
    +
    +		// process an inbound HTTP request
    +		Routed request = (Routed) o;
    +
    +		// parse the REST request parameters
    +		M messageParameters = specification.getUnresolvedMessageParameters();
    +		try {
    +			messageParameters.resolveParameters(request.pathParams(), request.queryParams());
    +			if (!messageParameters.isResolved()) {
    +				throw new IllegalArgumentException("One or more mandatory parameters is missing.");
    +			}
    +		}
    +		catch (IllegalArgumentException e) {
    +			HandlerUtils.sendErrorResponse(
    +				ctx,
    +				request.request(),
    +				new ErrorResponseBody(String.format("Bad request, could not parse parameters: %s", e.getMessage())),
    +				HttpResponseStatus.BAD_REQUEST);
    +			ReferenceCountUtil.release(request);
    +			return;
    +		}
    +		setMessageParameters(ctx, messageParameters);
    +
    +		// validate the inbound handshake request with the subclass
    +		CompletableFuture<Void> handshakeReady;
    +		try {
    +			handshakeReady = handshakeInitiated(ctx, messageParameters);
    +		} catch (Exception e) {
    +			handshakeReady = FutureUtils.completedExceptionally(e);
    +		}
    +		handshakeReady.whenCompleteAsync((Void v, Throwable throwable) -> {
    +			try {
    +				if (throwable != null) {
    +					Throwable error = ExceptionUtils.stripCompletionException(throwable);
    +					if (error instanceof RestHandlerException) {
    +						final RestHandlerException rhe = (RestHandlerException) error;
    +						log.error("Exception occurred in REST handler.", error);
    +						HandlerUtils.sendErrorResponse(ctx, request.request(), new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus());
    +					} else {
    +						log.error("Implementation error: Unhandled exception.", error);
    +						HandlerUtils.sendErrorResponse(ctx, request.request(), new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
    +					}
    +				} else {
    +					upgradeToWebSocket(ctx, request);
    +				}
    +			}
    +			finally {
    +				ReferenceCountUtil.release(request);
    +			}
    +		}, ctx.executor());
    +	}
    +
    +	private void upgradeToWebSocket(final ChannelHandlerContext ctx, Routed msg) {
    +
    +		// store the context of the handler that precedes the current handler,
    +		// to use that context later to forward the HTTP request to the WebSocket protocol handler
    +		String before = ctx.pipeline().names().get(ctx.pipeline().names().indexOf(ctx.name()) - 1);
    +		ChannelHandlerContext beforeCtx = ctx.pipeline().context(before);
    +
    +		// inject the websocket protocol handler into this channel, to be active
    +		// until the channel is closed.  note that the handshake may or may not complete synchronously.
    +		ctx.pipeline().addBefore(ctx.name(), WebSocketServerProtocolHandler.class.getName(),
    +			new WebSocketServerProtocolHandler(msg.path(), specification.getSubprotocol()));
    +
    +		// inject the message codec
    +		ctx.pipeline().addBefore(ctx.name(), messageCodec.getClass().getName(), messageCodec);
    +
    +		log.debug("Upgraded channel with WS protocol handler and message codec.");
    +
    +		// forward the message to the installed protocol handler to initiate handshaking
    +		HttpRequest request = msg.request();
    +		ReferenceCountUtil.retain(request);
    +		beforeCtx.fireChannelRead(request);
    +	}
    +
    +	@Override
    +	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    +		log.error("WebSocket channel error; closing the channel.", cause);
    +		ctx.close();
    +	}
    +
    +	/**
    +	 * Handles a client handshake request to open a WebSocket resource.  Returns a {@link CompletableFuture} to complete handshaking.
    +	 *
    +	 * <p>Implementations may decide whether to throw {@link RestHandlerException}s or fail the returned
    +	 * {@link CompletableFuture} with a {@link RestHandlerException}.
    +	 *
    +	 * <p>Failing the future with another exception type or throwing unchecked exceptions is regarded as an
    +	 * implementation error as it does not allow us to provide a meaningful HTTP status code. In this case a
    +	 * {@link HttpResponseStatus#INTERNAL_SERVER_ERROR} will be returned.
    +	 *
    +	 * @param parameters the REST parameters
    +
    +	 * @return future indicating completion of handshake pre-processing.
    +	 * @throws RestHandlerException to produce a pre-formatted HTTP error response.
    +	 */
    +	protected CompletableFuture<Void> handshakeInitiated(ChannelHandlerContext ctx, M parameters) throws Exception {
    +		return CompletableFuture.completedFuture(null);
    +	}
    +
    +	/**
    +	 * Invoked when the current channel has completed the handshaking to establish a WebSocket connection.
    +	 *
    +	 * @param ctx the channel handler context
    +	 * @param parameters the REST parameters
    +	 * @throws Exception if processing failed.
    +	 */
    +	protected void handshakeCompleted(ChannelHandlerContext ctx, M parameters) throws Exception {
    +	}
    +
    +	/**
    +	 * Invoked when the current channel has received a WebSocket message.
    +	 *
    +	 * <p>Be sure to release the message object (default behavior).
    +	 *
    +	 * @param ctx the channel handler context
    +	 * @param parameters the REST parameters
    +	 * @param msg the message received
    +	 * @throws Exception if the message could not be processed.
    +	 */
    +	protected void messageReceived(ChannelHandlerContext ctx, M parameters, O msg) throws Exception {
    +		ReferenceCountUtil.release(msg);
    --- End diff --
    
    I think I will change the behavior to be more consistent with `SimpleChannelInboundHandler` and auto-release the message rather than forcing the subclass to release it.  Note that `channelRead0` is being renamed to `messageReceived` in Netty 5.


---

[GitHub] flink pull request #4767: [FLINK-7738] [flip-6] Create WebSocket handler (se...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4767#discussion_r142640401
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
    @@ -275,4 +301,121 @@ public HttpResponseStatus getHttpResponseStatus() {
     			return httpResponseStatus;
     		}
     	}
    +
    +	public <M extends MessageHeaders<EmptyRequestBody, WebSocketUpgradeResponseBody, U>, U extends MessageParameters, R extends ResponseBody> CompletableFuture<WebSocket> sendWebSocketRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, Class<R> messageClazz, WebSocketListener... listeners) throws IOException {
    +		Preconditions.checkNotNull(targetAddress);
    +		Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536].");
    +		Preconditions.checkNotNull(messageHeaders);
    +		Preconditions.checkNotNull(messageParameters);
    +		Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved.");
    +
    +		String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
    +		URI webSocketURL = URI.create("ws://" + targetAddress + ":" + targetPort).resolve(targetUrl);
    +		LOG.debug("Sending WebSocket request to {}", webSocketURL);
    +
    +		final HttpHeaders headers = new DefaultHttpHeaders()
    +			.add(HttpHeaders.Names.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE);
    +
    +		Bootstrap bootstrap1 = bootstrap.clone().handler(new ClientBootstrap() {
    +			@Override
    +			protected void initChannel(SocketChannel channel) throws Exception {
    +				super.initChannel(channel);
    +				channel.pipeline()
    +					.addLast(new WebSocketClientProtocolHandler(webSocketURL, WebSocketVersion.V13, null, false, headers, 65535))
    +					.addLast(new WsResponseHandler(channel, messageClazz, listeners));
    +			}
    +		});
    +
    +		return CompletableFuture.supplyAsync(() -> bootstrap1.connect(targetAddress, targetPort), executor)
    +			.thenApply((channel) -> {
    +				try {
    +					return channel.sync();
    +				} catch (InterruptedException e) {
    +					throw new FlinkRuntimeException(e);
    +				}
    +			})
    +			.thenApply((ChannelFuture::channel))
    +			.thenCompose(channel -> {
    +				WsResponseHandler handler = channel.pipeline().get(WsResponseHandler.class);
    +				return handler.getWebSocketFuture();
    +			});
    +	}
    +
    +	private static class WsResponseHandler extends SimpleChannelInboundHandler<Object> implements WebSocket {
    +
    +		private final Channel channel;
    +		private final Class<? extends ResponseBody> messageClazz;
    +		private final List<WebSocketListener> listeners = new CopyOnWriteArrayList<>();
    +
    +		private final CompletableFuture<WebSocket> webSocketFuture = new CompletableFuture<>();
    +
    +		CompletableFuture<WebSocket> getWebSocketFuture() {
    +			return webSocketFuture;
    +		}
    +
    +		public WsResponseHandler(Channel channel, Class<? extends ResponseBody> messageClazz, WebSocketListener[] listeners) {
    +			this.channel = channel;
    +			this.messageClazz = messageClazz;
    +			this.listeners.addAll(Arrays.asList(listeners));
    +		}
    +
    +		@Override
    +		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    +			LOG.warn("WebSocket exception", cause);
    +			webSocketFuture.completeExceptionally(cause);
    +		}
    +
    +		@Override
    +		public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    +			if (evt instanceof WebSocketClientProtocolHandler.ClientHandshakeStateEvent) {
    +				WebSocketClientProtocolHandler.ClientHandshakeStateEvent wsevt = (WebSocketClientProtocolHandler.ClientHandshakeStateEvent) evt;
    +				switch(wsevt) {
    --- End diff --
    
    missing space after switch


---

[GitHub] flink pull request #4767: [FLINK-7738] [flip-6] Create WebSocket handler (se...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4767#discussion_r142637674
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/websocket/WebSocketListener.java ---
    @@ -0,0 +1,27 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.websocket;
    +
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +import org.apache.flink.runtime.util.event.EventListener;
    +
    +/**
    + * A listener for WebSocket messages.
    + */
    +public interface WebSocketListener extends EventListener<ResponseBody> { }
    --- End diff --
    
    I would add a proper type parameter. Currently every implementation would be forced to do instanceof+cast checks.


---

[GitHub] flink issue #4767: [FLINK-7738] [flip-6] Create WebSocket handler (server, c...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on the issue:

    https://github.com/apache/flink/pull/4767
  
    Updated the description based on the latest PR.


---

[GitHub] flink pull request #4767: [FLINK-7738] [flip-6] Create WebSocket handler (se...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4767#discussion_r142712872
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/websocket/WebSocket.java ---
    @@ -0,0 +1,44 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.websocket;
    +
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
    +
    +/**
    + * A WebSocket for sending and receiving messages.
    + */
    +public interface WebSocket {
    +
    +	/**
    +	 * Adds a listener for websocket messages.
    +	 */
    +	void addListener(WebSocketListener listener);
    +
    +	/**
    +	 * Sends a message.
    +	 */
    +	ChannelFuture send(ResponseBody message);
    --- End diff --
    
    Good catch


---

[GitHub] flink issue #4767: [FLINK-7738] [flip-6] Create WebSocket handler (server, c...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on the issue:

    https://github.com/apache/flink/pull/4767
  
    @tillrohrmann are you still interested in this websocket code for the REST server?   Aside from rebasing, any 'must fix' issues here?


---