You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2019/12/11 20:15:22 UTC

[tinkerpop] branch driver-35 updated: Forced SingleRequestConnection to wait for handshake

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch driver-35
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/driver-35 by this push:
     new 94e361b  Forced SingleRequestConnection to wait for handshake
94e361b is described below

commit 94e361ba0a474c59cb0a95cb2855985ddb8010a0
Author: stephen <sp...@gmail.com>
AuthorDate: Wed Dec 11 15:13:11 2019 -0500

    Forced SingleRequestConnection to wait for handshake
    
    Not sure it was doing such a good job of that before this change which is why I think the travis tests were failing
---
 .../org/apache/tinkerpop/gremlin/driver/Channelizer.java | 16 +++++++++-------
 .../tinkerpop/gremlin/driver/DefaultConnectionPool.java  |  7 +++++--
 .../org/apache/tinkerpop/gremlin/driver/Settings.java    |  5 ++---
 .../gremlin/driver/SingleRequestConnection.java          | 10 ++++------
 .../gremlin/driver/exception/ConnectionException.java    |  6 ++++++
 .../gremlin/driver/handler/WebSocketClientHandler.java   | 10 +++++++++-
 .../tinkerpop/gremlin/driver/simple/WebSocketClient.java |  4 ++--
 7 files changed, 37 insertions(+), 21 deletions(-)

diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index fc10371..f5c5301 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -143,13 +143,18 @@ public interface Channelizer extends ChannelHandler {
                 throw new IllegalStateException("To use wss scheme ensure that enableSsl is set to true in configuration");
 
             final int maxContentLength = cluster.connectionPoolSettings().maxContentLength;
+            final long maxWaitForConnection = cluster.connectionPoolSettings().maxWaitForConnection;
 
+            // seems ok to use the maxWaitForConnection as the top end for the handshake because the wait for the
+            // handshake is going just get interrupted by the wait for the overall connection. no point to adding
+            // another setting specific to the handshake to complicate things.
             final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(
                     connectionPool.getHost().getHostUri(), WebSocketVersion.V13, null, false,
                     EmptyHttpHeaders.INSTANCE, maxContentLength);
             final WebSocketClientProtocolHandler nettyWsHandler = new WebSocketClientProtocolHandler(
-                    handshaker, true, false, 9000);
-            final WebSocketClientHandler handler = new WebSocketClientHandler(connectionPool.getActiveChannels());
+                    handshaker, true, false, maxWaitForConnection);
+            final WebSocketClientHandler handler = new WebSocketClientHandler(
+                    connectionPool.getHost().getHostUri(), connectionPool.getActiveChannels());
 
             final int keepAliveInterval = toIntExact(TimeUnit.SECONDS.convert(cluster.connectionPoolSettings().keepAliveInterval, TimeUnit.MILLISECONDS));
             pipeline.addLast("http-codec", new HttpClientCodec());
@@ -162,7 +167,6 @@ public interface Channelizer extends ChannelHandler {
             pipeline.addLast("gremlin-decoder", webSocketGremlinResponseDecoder);
         }
 
-
         @Override
         public void connected(final Channel ch) {
             try {
@@ -176,11 +180,9 @@ public interface Channelizer extends ChannelHandler {
                             throw new ConnectionException(connectionPool.getHost().getHostUri(),
                                     "Could not complete websocket handshake - ensure that client protocol matches server", f.cause());
                         }
-                    }).get(3000, TimeUnit.MILLISECONDS);
+                    }).sync();
                 }
-            } catch (ExecutionException ex) {
-                throw new RuntimeException(ex.getCause());
-            } catch (InterruptedException | TimeoutException ex) {
+            }  catch (InterruptedException ex) {
                 // catching the InterruptedException will reset the interrupted flag. This is intentional.
                 throw new RuntimeException(new ConnectionException(connectionPool.getHost().getHostUri(),
                                                                    "Timed out while performing websocket handshake - ensure that client protocol matches server", ex.getCause()));
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java
index 8c399cb..1c173d2 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.driver;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.group.ChannelGroup;
@@ -32,12 +33,15 @@ import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
 
+import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
+import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.ConnectException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -214,9 +218,8 @@ public class DefaultConnectionPool implements ConnectionPool {
 
     @Override
     public Connection prepareConnection() throws TimeoutException, ConnectException {
-        if (closeFuture.get() != null) {
+        if (closeFuture.get() != null)
             throw new RuntimeException(this + " is closing. Cannot borrow connection.");
-        }
 
         // Get a channel, verify handshake is done and then attach it to a connectionPool
         final Channel ch = this.channelPool.acquire().syncUninterruptibly().getNow();
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
index 5deba8b..a4e8c1a 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
@@ -288,9 +288,8 @@ final class Settings {
         public int maxSize = ConnectionPool.DEFAULT_MAX_POOL_SIZE;
 
         /**
-         * Length of time in milliseconds to wait on an idle connection before sending a keep-alive request. This
-         * setting is only relevant to {@link Channelizer} implementations that return {@code true} for
-         * {@link Channelizer#supportsKeepAlive()}. Set to zero to disable this feature.
+         * Length of time in milliseconds to wait on an idle connection before sending a keep-alive request. Set to
+         * zero to disable this feature.
          */
         public long keepAliveInterval = Connection.DEFAULT_KEEP_ALIVE_INTERVAL;
 
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/SingleRequestConnection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/SingleRequestConnection.java
index 6bb745d..6283fff 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/SingleRequestConnection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/SingleRequestConnection.java
@@ -57,10 +57,9 @@ public class SingleRequestConnection implements Connection {
     static final AttributeKey<ResultQueue> RESULT_QUEUE_ATTRIBUTE_KEY = AttributeKey.newInstance("resultQueueFuture");
 
     SingleRequestConnection(final Channel channel, final ConnectionPool pool) {
-        /* A channel is attached with a request only when the channel is active. This is the responsibility
-         * of channelpool to ensure that the channel attached to this connection is healthy. Something is fishy
-         * if this is not true, hence, IllegalState.
-         */
+        // A channel is attached with a request only when the channel is active. This is the responsibility
+        // of channelpool to ensure that the channel attached to this connection is healthy. Something is fishy
+        // if this is not true, hence, IllegalState.
         if (!channel.isActive()) {
             throw new IllegalStateException("Channel " + channel + " is not active.");
         }
@@ -150,9 +149,8 @@ public class SingleRequestConnection implements Connection {
      */
     @Override
     public ChannelPromise write(final RequestMessage requestMessage, final CompletableFuture<ResultSet> resultQueueSetup) {
-        if (this.resultFuture != null) {
+        if (this.resultFuture != null)
             throw new IllegalStateException("This " + this + " is already in use. Cannot reuse it for request " + requestMessage);
-        }
 
         this.resultFuture = resultQueueSetup;
 
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java
index 67101b3..5710cf9 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java
@@ -29,6 +29,12 @@ public class ConnectionException extends Exception {
     private URI uri;
     private InetSocketAddress address;
 
+    public ConnectionException(final URI uri, final String message) {
+        super(message);
+        this.uri = uri;
+        this.address = null;
+    }
+
     public ConnectionException(final URI uri, final InetSocketAddress addy, final String message) {
         super(message);
         this.address = addy;
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 1d0b73f..95f6923 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -32,9 +32,12 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.util.ReferenceCountUtil;
+import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.URI;
+
 /**
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
@@ -42,10 +45,12 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
     private static final Logger logger = LoggerFactory.getLogger(WebSocketClientHandler.class);
     private ChannelPromise handshakeFuture;
     private final ChannelGroup activeChannels;
+    private final URI host;
 
-    public WebSocketClientHandler(final ChannelGroup activeChannels) {
+    public WebSocketClientHandler(final URI host, final ChannelGroup activeChannels) {
         super(false);
         this.activeChannels = activeChannels;
+        this.host = host;
     }
 
     public ChannelFuture handshakeFuture() {
@@ -75,6 +80,9 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
         if (event == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
             handshakeFuture.setSuccess();
             activeChannels.add(ctx.channel());
+        } else if (event == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_TIMEOUT) {
+            handshakeFuture.setFailure(new ConnectionException(host,
+                    "Timed out while performing websocket handshake - ensure that client protocol matches server"));
         } else if (event instanceof IdleStateEvent) {
             final IdleStateEvent e = (IdleStateEvent) event;
             if (e.state() == IdleState.READER_IDLE) {
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
index 2d045d1..5f2603d 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
@@ -72,8 +72,8 @@ public class WebSocketClient extends AbstractClient {
                     uri, WebSocketVersion.V13, null, false,
                     EmptyHttpHeaders.INSTANCE, 65536);
             final WebSocketClientProtocolHandler nettyWsHandler = new WebSocketClientProtocolHandler(
-                    handshaker, true, false, 9000);
-            final WebSocketClientHandler wsHandler = new WebSocketClientHandler(new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
+                    handshaker, true, false, 30000);
+            final WebSocketClientHandler wsHandler = new WebSocketClientHandler(uri, new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
 
             final MessageSerializer serializer = new GraphBinaryMessageSerializerV1();
             b.channel(NioSocketChannel.class)