You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2019/12/09 13:51:37 UTC

[tinkerpop] branch driver-35 updated: Used netty's WebSocketClientProtocolHandler

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch driver-35
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/driver-35 by this push:
     new 1a5d0b0  Used netty's WebSocketClientProtocolHandler
1a5d0b0 is described below

commit 1a5d0b02c15f9a3dc6980e09e84af187b9d460db
Author: stephen <sp...@gmail.com>
AuthorDate: Mon Dec 9 08:50:28 2019 -0500

    Used netty's WebSocketClientProtocolHandler
    
    Let netty do more of the heavy lifting so we could factor out our own custom code.
---
 .../tinkerpop/gremlin/driver/Channelizer.java      | 21 +++++----
 .../tinkerpop/gremlin/driver/ConnectionPool.java   |  7 ++-
 .../driver/handler/WebSocketClientHandler.java     | 50 ++++------------------
 ...loseHandler.java => WebSocketCloseHandler.java} |  2 +-
 .../handler/WebSocketGremlinResponseDecoder.java   |  1 -
 .../gremlin/driver/simple/WebSocketClient.java     | 15 ++++---
 6 files changed, 37 insertions(+), 59 deletions(-)

diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 5d20ab1..0ebfbfa 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -26,7 +26,9 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.http.EmptyHttpHeaders;
 import io.netty.handler.codec.http.HttpClientCodec;
 import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
 import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
+import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
 import io.netty.handler.codec.http.websocketx.WebSocketVersion;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.timeout.IdleStateHandler;
@@ -37,7 +39,7 @@ import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinResponseDecoder;
 import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
 import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
 import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder;
-import org.apache.tinkerpop.gremlin.driver.handler.WebsocketCloseHandler;
+import org.apache.tinkerpop.gremlin.driver.handler.WebSocketCloseHandler;
 
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
@@ -140,18 +142,21 @@ public interface Channelizer extends ChannelHandler {
                 throw new IllegalStateException("To use wss scheme ensure that enableSsl is set to true in configuration");
 
             final int maxContentLength = cluster.connectionPoolSettings().maxContentLength;
-            // TODO: Replace WebSocketClientHandler with Netty's WebSocketClientProtocolHandler
-            final WebSocketClientHandler handler = new WebSocketClientHandler(
-                    WebSocketClientHandshakerFactory.newHandshaker(
-                            connectionPool.getHost().getHostUri(), WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, maxContentLength),
-                    connectionPool.getActiveChannels());
 
-            int keepAliveInterval = toIntExact(TimeUnit.SECONDS.convert(cluster.connectionPoolSettings().keepAliveInterval, TimeUnit.MILLISECONDS));
+            final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(
+                    connectionPool.getHost().getHostUri(), WebSocketVersion.V13, null, false,
+                    EmptyHttpHeaders.INSTANCE, maxContentLength);
+            final WebSocketClientProtocolHandler nettyWsHandler = new WebSocketClientProtocolHandler(
+                    handshaker, true, false, 9000);
+            final WebSocketClientHandler handler = new WebSocketClientHandler(connectionPool.getActiveChannels());
+
+            final int keepAliveInterval = toIntExact(TimeUnit.SECONDS.convert(cluster.connectionPoolSettings().keepAliveInterval, TimeUnit.MILLISECONDS));
             pipeline.addLast("http-codec", new HttpClientCodec());
             pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength));
             pipeline.addLast("netty-idle-state-Handler", new IdleStateHandler(0, keepAliveInterval, 0));
+            pipeline.addLast("netty-ws-handler", nettyWsHandler);
             pipeline.addLast("ws-client-handler", handler);
-            pipeline.addLast("ws-close-handler", new WebsocketCloseHandler());
+            pipeline.addLast("ws-close-handler", new WebSocketCloseHandler());
             pipeline.addLast("gremlin-encoder", webSocketGremlinRequestEncoder);
             pipeline.addLast("gremlin-decoder", webSocketGremlinResponseDecoder);
         }
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index 5f7ee3b..35cecee 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -31,7 +31,7 @@ import java.util.concurrent.TimeoutException;
  * requests to a specific server. It is also the gatekeeper for the number of simultaneous requests
  * to the server.
  *
- * More specifically, it associates a Netty {@link Channel} with a {@link Connection}.
+ * More specifically, it associates a Netty {@code Channel} with a {@link Connection}.
  *
  * A typical workflow for the lifetime of a Gremlin request would be as follows:
  * 1. Connection pool is set up attached to a host on initialization.
@@ -65,7 +65,8 @@ public interface ConnectionPool {
     /**
      * Release the connection and associated resources (like channel) so that the resources can be re-used.
      */
-    CompletableFuture<Void> releaseConnection(Connection conn);
+    CompletableFuture<Void> releaseConnection(final Connection conn);
+
     /**
      * Close the connection pool and all associated resources gracefully.
      * This method should be made idempotent and thread safe.
@@ -73,10 +74,12 @@ public interface ConnectionPool {
     CompletableFuture<Void> closeAsync();
 
     ScheduledExecutorService executor();
+
     /**
      * @return {@link Host} associated with the connection pool
      */
     Host getHost();
+
     /**
      * @return {@link Cluster} containing the {@link Host}
      */
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 2cd0f95..aded787 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -18,23 +18,19 @@
  */
 package org.apache.tinkerpop.gremlin.driver.handler;
 
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.group.ChannelGroup;
-import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
+import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.util.CharsetUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,12 +39,10 @@ import org.slf4j.LoggerFactory;
  */
 public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
     private static final Logger logger = LoggerFactory.getLogger(WebSocketClientHandler.class);
-    private final WebSocketClientHandshaker handshaker;
     private ChannelPromise handshakeFuture;
     private final ChannelGroup activeChannels;
 
-    public WebSocketClientHandler(final WebSocketClientHandshaker handshaker, final ChannelGroup activeChannels) {
-        this.handshaker = handshaker;
+    public WebSocketClientHandler(final ChannelGroup activeChannels) {
         this.activeChannels = activeChannels;
     }
 
@@ -62,46 +56,15 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
     }
 
     @Override
-    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
-        handshaker.handshake(ctx.channel()).addListener(f -> {
-                if (!f.isSuccess()) {
-                    if (!handshakeFuture.isDone()) handshakeFuture.setFailure(f.cause());
-                    ctx.fireExceptionCaught(f.cause());
-                } else {
-                    activeChannels.add(ctx.channel());
-                }
-        });
-    }
-
-    @Override
     protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) throws Exception {
-        final Channel ch = ctx.channel();
-        if (!handshaker.isHandshakeComplete()) {
-            // web socket client connected
-            handshaker.finishHandshake(ch, (FullHttpResponse) msg);
-            handshakeFuture.setSuccess();
-            return;
-        }
-
-        if (msg instanceof FullHttpResponse) {
-            final FullHttpResponse response = (FullHttpResponse) msg;
-            throw new Exception("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content="
-                    + response.content().toString(CharsetUtil.UTF_8) + ')');
-        }
-
-        // a close frame doesn't mean much here.  errors raised from closed channels will mark the host as dead
         final WebSocketFrame frame = (WebSocketFrame) msg;
         if (frame instanceof TextWebSocketFrame) {
             ctx.fireChannelRead(frame.retain(2));
-        } else if (frame instanceof PingWebSocketFrame) {
-            ctx.writeAndFlush(new PongWebSocketFrame());
-        }else if (frame instanceof PongWebSocketFrame) {
-            logger.debug("Received response from keep-alive request");
         } else if (frame instanceof BinaryWebSocketFrame) {
             ctx.fireChannelRead(frame.retain(2));
-        } else if (frame instanceof CloseWebSocketFrame)
-            ch.close();
-
+        } else if (frame instanceof PongWebSocketFrame) {
+            logger.debug("Received response from keep-alive request");
+        }
     }
 
     @Override
@@ -114,6 +77,9 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
                 logger.info("Sending ping frame to the server");
                 ctx.writeAndFlush(new PingWebSocketFrame());
             }
+        } else if (event == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
+            handshakeFuture.setSuccess();
+            activeChannels.add(ctx.channel());
         }
     }
 
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketCloseHandler.java
similarity index 97%
rename from gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java
rename to gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketCloseHandler.java
index f93ea93..86c6a75 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketCloseHandler.java
@@ -32,7 +32,7 @@ import io.netty.util.AttributeKey;
  * <p>
  * This handler is also idempotent and sends out the CloseFrame only once.
  */
-public class WebsocketCloseHandler extends ChannelOutboundHandlerAdapter {
+public class WebSocketCloseHandler extends ChannelOutboundHandlerAdapter {
     private static final AttributeKey<Boolean> CLOSE_WS_SENT = AttributeKey.newInstance("closeWebSocketSent");
 
     @Override
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
index 383e5a5..ec88bb9 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
@@ -18,7 +18,6 @@
  */
 package org.apache.tinkerpop.gremlin.driver.handler;
 
-import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
 import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
 import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer;
 import io.netty.channel.ChannelHandler;
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
index 651b1f3..2d045d1 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
@@ -22,6 +22,8 @@ import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.handler.codec.http.EmptyHttpHeaders;
+import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
+import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
 import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
@@ -66,11 +68,13 @@ public class WebSocketClient extends AbstractClient {
             throw new IllegalArgumentException("Unsupported protocol: " + protocol);
 
         try {
-            final WebSocketClientHandler wsHandler =
-                    new WebSocketClientHandler(
-                            WebSocketClientHandshakerFactory.newHandshaker(
-                                    uri, WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, 65536),
-                            new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
+            final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(
+                    uri, WebSocketVersion.V13, null, false,
+                    EmptyHttpHeaders.INSTANCE, 65536);
+            final WebSocketClientProtocolHandler nettyWsHandler = new WebSocketClientProtocolHandler(
+                    handshaker, true, false, 9000);
+            final WebSocketClientHandler wsHandler = new WebSocketClientHandler(new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
+
             final MessageSerializer serializer = new GraphBinaryMessageSerializerV1();
             b.channel(NioSocketChannel.class)
                     .handler(new ChannelInitializer<SocketChannel>() {
@@ -80,6 +84,7 @@ public class WebSocketClient extends AbstractClient {
                             p.addLast(
                                     new HttpClientCodec(),
                                     new HttpObjectAggregator(65536),
+                                    nettyWsHandler,
                                     wsHandler,
                                     new WebSocketGremlinRequestEncoder(true, serializer),
                                     new WebSocketGremlinResponseDecoder(serializer),