You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2019/12/09 17:24:11 UTC

[tinkerpop] branch driver-35 updated: Limited some listener creation for handshakes

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch driver-35
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/driver-35 by this push:
     new 3597d5f  Limited some listener creation for handshakes
3597d5f is described below

commit 3597d5f884e4678f0747133d3d536accaefc0d00
Author: stephen <sp...@gmail.com>
AuthorDate: Mon Dec 9 12:16:35 2019 -0500

    Limited some listener creation for handshakes
    
    The handshake should only happen once so calls to Channelizer.connect() might be redundant if the handshake already occurred.
---
 .../tinkerpop/gremlin/driver/Channelizer.java      | 22 +++++++++++++---------
 .../gremlin/driver/DefaultConnectionPool.java      |  2 --
 .../driver/handler/WebSocketClientHandler.java     | 19 +++++++++++--------
 3 files changed, 24 insertions(+), 19 deletions(-)

diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 0ebfbfa..fc10371 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -19,6 +19,7 @@
 package org.apache.tinkerpop.gremlin.driver;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
@@ -165,15 +166,18 @@ public interface Channelizer extends ChannelHandler {
         @Override
         public void connected(final Channel ch) {
             try {
-                // block for a few seconds - if the handshake takes longer than there's gotta be issues with that
-                // server. more than likely, SSL is enabled on the server, but the client forgot to enable it or
-                // perhaps the server is not configured for websockets.
-                ((WebSocketClientHandler)(ch.pipeline().get("ws-client-handler"))).handshakeFuture().addListener( f -> {
-                    if (!f.isSuccess()) {
-                        throw new ConnectionException(connectionPool.getHost().getHostUri(),
-                                                                           "Could not complete websocket handshake - ensure that client protocol matches server", f.cause());
-                    }
-                }).get(1500, TimeUnit.MILLISECONDS);
+                // be sure the handshake is done - if the handshake takes longer than the specified time there's
+                // gotta be issues with that  server. a common problem where this comes up: SSL is enabled on the
+                // server, but the client forgot to enable it or perhaps the server is not configured for websockets.
+                final ChannelFuture handshakeFuture = ((WebSocketClientHandler)(ch.pipeline().get("ws-client-handler"))).handshakeFuture();
+                if (!handshakeFuture.isDone()) {
+                    handshakeFuture.addListener(f -> {
+                        if (!f.isSuccess()) {
+                            throw new ConnectionException(connectionPool.getHost().getHostUri(),
+                                    "Could not complete websocket handshake - ensure that client protocol matches server", f.cause());
+                        }
+                    }).get(3000, TimeUnit.MILLISECONDS);
+                }
             } catch (ExecutionException ex) {
                 throw new RuntimeException(ex.getCause());
             } catch (InterruptedException | TimeoutException ex) {
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java
index 7bda3d5..8c399cb 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java
@@ -220,8 +220,6 @@ public class DefaultConnectionPool implements ConnectionPool {
 
         // Get a channel, verify handshake is done and then attach it to a connectionPool
         final Channel ch = this.channelPool.acquire().syncUninterruptibly().getNow();
-
-        // TODO: This call is un-necessary on every channel acquire, since handshake is done once.
         channelizer.connected(ch);
 
         return new SingleRequestConnection(ch, this);
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index aded787..1d0b73f 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -31,6 +31,7 @@ import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.ReferenceCountUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +44,7 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
     private final ChannelGroup activeChannels;
 
     public WebSocketClientHandler(final ChannelGroup activeChannels) {
+        super(false);
         this.activeChannels = activeChannels;
     }
 
@@ -58,18 +60,22 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
     @Override
     protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) throws Exception {
         final WebSocketFrame frame = (WebSocketFrame) msg;
-        if (frame instanceof TextWebSocketFrame) {
-            ctx.fireChannelRead(frame.retain(2));
-        } else if (frame instanceof BinaryWebSocketFrame) {
-            ctx.fireChannelRead(frame.retain(2));
+        if (frame instanceof TextWebSocketFrame || frame instanceof BinaryWebSocketFrame) {
+            ctx.fireChannelRead(frame.retain());
         } else if (frame instanceof PongWebSocketFrame) {
             logger.debug("Received response from keep-alive request");
+            ReferenceCountUtil.release(frame);
+        } else {
+            throw new IllegalStateException("Unexpected message of " + msg.getClass().getSimpleName() + ": " + msg);
         }
     }
 
     @Override
     public void userEventTriggered(final ChannelHandlerContext ctx, final Object event) throws Exception {
-        if (event instanceof IdleStateEvent) {
+        if (event == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
+            handshakeFuture.setSuccess();
+            activeChannels.add(ctx.channel());
+        } else if (event instanceof IdleStateEvent) {
             final IdleStateEvent e = (IdleStateEvent) event;
             if (e.state() == IdleState.READER_IDLE) {
                 logger.warn("Server " + ctx.channel() + " has been idle for too long.");
@@ -77,9 +83,6 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
                 logger.info("Sending ping frame to the server");
                 ctx.writeAndFlush(new PingWebSocketFrame());
             }
-        } else if (event == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
-            handshakeFuture.setSuccess();
-            activeChannels.add(ctx.channel());
         }
     }