You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2019/12/09 13:51:37 UTC
[tinkerpop] branch driver-35 updated: Used netty's
WebSocketClientProtocolHandler
This is an automated email from the ASF dual-hosted git repository.
spmallette pushed a commit to branch driver-35
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/driver-35 by this push:
new 1a5d0b0 Used netty's WebSocketClientProtocolHandler
1a5d0b0 is described below
commit 1a5d0b02c15f9a3dc6980e09e84af187b9d460db
Author: stephen <sp...@gmail.com>
AuthorDate: Mon Dec 9 08:50:28 2019 -0500
Used netty's WebSocketClientProtocolHandler
Let netty do more of the heavy lifting so we could factor out our own custom code.
---
.../tinkerpop/gremlin/driver/Channelizer.java | 21 +++++----
.../tinkerpop/gremlin/driver/ConnectionPool.java | 7 ++-
.../driver/handler/WebSocketClientHandler.java | 50 ++++------------------
...loseHandler.java => WebSocketCloseHandler.java} | 2 +-
.../handler/WebSocketGremlinResponseDecoder.java | 1 -
.../gremlin/driver/simple/WebSocketClient.java | 15 ++++---
6 files changed, 37 insertions(+), 59 deletions(-)
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 5d20ab1..0ebfbfa 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -26,7 +26,9 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.EmptyHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
+import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.IdleStateHandler;
@@ -37,7 +39,7 @@ import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinResponseDecoder;
import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder;
-import org.apache.tinkerpop.gremlin.driver.handler.WebsocketCloseHandler;
+import org.apache.tinkerpop.gremlin.driver.handler.WebSocketCloseHandler;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@@ -140,18 +142,21 @@ public interface Channelizer extends ChannelHandler {
throw new IllegalStateException("To use wss scheme ensure that enableSsl is set to true in configuration");
final int maxContentLength = cluster.connectionPoolSettings().maxContentLength;
- // TODO: Replace WebSocketClientHandler with Netty's WebSocketClientProtocolHandler
- final WebSocketClientHandler handler = new WebSocketClientHandler(
- WebSocketClientHandshakerFactory.newHandshaker(
- connectionPool.getHost().getHostUri(), WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, maxContentLength),
- connectionPool.getActiveChannels());
- int keepAliveInterval = toIntExact(TimeUnit.SECONDS.convert(cluster.connectionPoolSettings().keepAliveInterval, TimeUnit.MILLISECONDS));
+ final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(
+ connectionPool.getHost().getHostUri(), WebSocketVersion.V13, null, false,
+ EmptyHttpHeaders.INSTANCE, maxContentLength);
+ final WebSocketClientProtocolHandler nettyWsHandler = new WebSocketClientProtocolHandler(
+ handshaker, true, false, 9000);
+ final WebSocketClientHandler handler = new WebSocketClientHandler(connectionPool.getActiveChannels());
+
+ final int keepAliveInterval = toIntExact(TimeUnit.SECONDS.convert(cluster.connectionPoolSettings().keepAliveInterval, TimeUnit.MILLISECONDS));
pipeline.addLast("http-codec", new HttpClientCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength));
pipeline.addLast("netty-idle-state-Handler", new IdleStateHandler(0, keepAliveInterval, 0));
+ pipeline.addLast("netty-ws-handler", nettyWsHandler);
pipeline.addLast("ws-client-handler", handler);
- pipeline.addLast("ws-close-handler", new WebsocketCloseHandler());
+ pipeline.addLast("ws-close-handler", new WebSocketCloseHandler());
pipeline.addLast("gremlin-encoder", webSocketGremlinRequestEncoder);
pipeline.addLast("gremlin-decoder", webSocketGremlinResponseDecoder);
}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index 5f7ee3b..35cecee 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -31,7 +31,7 @@ import java.util.concurrent.TimeoutException;
* requests to a specific server. It is also the gatekeeper for the number of simultaneous requests
* to the server.
*
- * More specifically, it associates a Netty {@link Channel} with a {@link Connection}.
+ * More specifically, it associates a Netty {@code Channel} with a {@link Connection}.
*
* A typical workflow for the lifetime of a Gremlin request would be as follows:
* 1. Connection pool is set up attached to a host on initialization.
@@ -65,7 +65,8 @@ public interface ConnectionPool {
/**
* Release the connection and associated resources (like channel) so that the resources can be re-used.
*/
- CompletableFuture<Void> releaseConnection(Connection conn);
+ CompletableFuture<Void> releaseConnection(final Connection conn);
+
/**
* Close the connection pool and all associated resources gracefully.
* This method should be made idempotent and thread safe.
@@ -73,10 +74,12 @@ public interface ConnectionPool {
CompletableFuture<Void> closeAsync();
ScheduledExecutorService executor();
+
/**
* @return {@link Host} associated with the connection pool
*/
Host getHost();
+
/**
* @return {@link Cluster} containing the {@link Host}
*/
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 2cd0f95..aded787 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -18,23 +18,19 @@
*/
package org.apache.tinkerpop.gremlin.driver.handler;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
-import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
+import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,12 +39,10 @@ import org.slf4j.LoggerFactory;
*/
public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger logger = LoggerFactory.getLogger(WebSocketClientHandler.class);
- private final WebSocketClientHandshaker handshaker;
private ChannelPromise handshakeFuture;
private final ChannelGroup activeChannels;
- public WebSocketClientHandler(final WebSocketClientHandshaker handshaker, final ChannelGroup activeChannels) {
- this.handshaker = handshaker;
+ public WebSocketClientHandler(final ChannelGroup activeChannels) {
this.activeChannels = activeChannels;
}
@@ -62,46 +56,15 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
}
@Override
- public void channelActive(final ChannelHandlerContext ctx) throws Exception {
- handshaker.handshake(ctx.channel()).addListener(f -> {
- if (!f.isSuccess()) {
- if (!handshakeFuture.isDone()) handshakeFuture.setFailure(f.cause());
- ctx.fireExceptionCaught(f.cause());
- } else {
- activeChannels.add(ctx.channel());
- }
- });
- }
-
- @Override
protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) throws Exception {
- final Channel ch = ctx.channel();
- if (!handshaker.isHandshakeComplete()) {
- // web socket client connected
- handshaker.finishHandshake(ch, (FullHttpResponse) msg);
- handshakeFuture.setSuccess();
- return;
- }
-
- if (msg instanceof FullHttpResponse) {
- final FullHttpResponse response = (FullHttpResponse) msg;
- throw new Exception("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content="
- + response.content().toString(CharsetUtil.UTF_8) + ')');
- }
-
- // a close frame doesn't mean much here. errors raised from closed channels will mark the host as dead
final WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
ctx.fireChannelRead(frame.retain(2));
- } else if (frame instanceof PingWebSocketFrame) {
- ctx.writeAndFlush(new PongWebSocketFrame());
- }else if (frame instanceof PongWebSocketFrame) {
- logger.debug("Received response from keep-alive request");
} else if (frame instanceof BinaryWebSocketFrame) {
ctx.fireChannelRead(frame.retain(2));
- } else if (frame instanceof CloseWebSocketFrame)
- ch.close();
-
+ } else if (frame instanceof PongWebSocketFrame) {
+ logger.debug("Received response from keep-alive request");
+ }
}
@Override
@@ -114,6 +77,9 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
logger.info("Sending ping frame to the server");
ctx.writeAndFlush(new PingWebSocketFrame());
}
+ } else if (event == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
+ handshakeFuture.setSuccess();
+ activeChannels.add(ctx.channel());
}
}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketCloseHandler.java
similarity index 97%
rename from gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java
rename to gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketCloseHandler.java
index f93ea93..86c6a75 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketCloseHandler.java
@@ -32,7 +32,7 @@ import io.netty.util.AttributeKey;
* <p>
* This handler is also idempotent and sends out the CloseFrame only once.
*/
-public class WebsocketCloseHandler extends ChannelOutboundHandlerAdapter {
+public class WebSocketCloseHandler extends ChannelOutboundHandlerAdapter {
private static final AttributeKey<Boolean> CLOSE_WS_SENT = AttributeKey.newInstance("closeWebSocketSent");
@Override
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
index 383e5a5..ec88bb9 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
@@ -18,7 +18,6 @@
*/
package org.apache.tinkerpop.gremlin.driver.handler;
-import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer;
import io.netty.channel.ChannelHandler;
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
index 651b1f3..2d045d1 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
@@ -22,6 +22,8 @@ import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.EmptyHttpHeaders;
+import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
+import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
@@ -66,11 +68,13 @@ public class WebSocketClient extends AbstractClient {
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
try {
- final WebSocketClientHandler wsHandler =
- new WebSocketClientHandler(
- WebSocketClientHandshakerFactory.newHandshaker(
- uri, WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, 65536),
- new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
+ final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(
+ uri, WebSocketVersion.V13, null, false,
+ EmptyHttpHeaders.INSTANCE, 65536);
+ final WebSocketClientProtocolHandler nettyWsHandler = new WebSocketClientProtocolHandler(
+ handshaker, true, false, 9000);
+ final WebSocketClientHandler wsHandler = new WebSocketClientHandler(new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
+
final MessageSerializer serializer = new GraphBinaryMessageSerializerV1();
b.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@@ -80,6 +84,7 @@ public class WebSocketClient extends AbstractClient {
p.addLast(
new HttpClientCodec(),
new HttpObjectAggregator(65536),
+ nettyWsHandler,
wsHandler,
new WebSocketGremlinRequestEncoder(true, serializer),
new WebSocketGremlinResponseDecoder(serializer),