You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2019/12/11 20:15:22 UTC
[tinkerpop] branch driver-35 updated: Forced
SingleRequestConnection to wait for handshake
This is an automated email from the ASF dual-hosted git repository.
spmallette pushed a commit to branch driver-35
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/driver-35 by this push:
new 94e361b Forced SingleRequestConnection to wait for handshake
94e361b is described below
commit 94e361ba0a474c59cb0a95cb2855985ddb8010a0
Author: stephen <sp...@gmail.com>
AuthorDate: Wed Dec 11 15:13:11 2019 -0500
Forced SingleRequestConnection to wait for handshake
Not sure it was doing such a good job of that before this change which is why I think the travis tests were failing
---
.../org/apache/tinkerpop/gremlin/driver/Channelizer.java | 16 +++++++++-------
.../tinkerpop/gremlin/driver/DefaultConnectionPool.java | 7 +++++--
.../org/apache/tinkerpop/gremlin/driver/Settings.java | 5 ++---
.../gremlin/driver/SingleRequestConnection.java | 10 ++++------
.../gremlin/driver/exception/ConnectionException.java | 6 ++++++
.../gremlin/driver/handler/WebSocketClientHandler.java | 10 +++++++++-
.../tinkerpop/gremlin/driver/simple/WebSocketClient.java | 4 ++--
7 files changed, 37 insertions(+), 21 deletions(-)
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index fc10371..f5c5301 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -143,13 +143,18 @@ public interface Channelizer extends ChannelHandler {
throw new IllegalStateException("To use wss scheme ensure that enableSsl is set to true in configuration");
final int maxContentLength = cluster.connectionPoolSettings().maxContentLength;
+ final long maxWaitForConnection = cluster.connectionPoolSettings().maxWaitForConnection;
+ // seems ok to use the maxWaitForConnection as the top end for the handshake because the wait for the
+ // handshake is going just get interrupted by the wait for the overall connection. no point to adding
+ // another setting specific to the handshake to complicate things.
final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(
connectionPool.getHost().getHostUri(), WebSocketVersion.V13, null, false,
EmptyHttpHeaders.INSTANCE, maxContentLength);
final WebSocketClientProtocolHandler nettyWsHandler = new WebSocketClientProtocolHandler(
- handshaker, true, false, 9000);
- final WebSocketClientHandler handler = new WebSocketClientHandler(connectionPool.getActiveChannels());
+ handshaker, true, false, maxWaitForConnection);
+ final WebSocketClientHandler handler = new WebSocketClientHandler(
+ connectionPool.getHost().getHostUri(), connectionPool.getActiveChannels());
final int keepAliveInterval = toIntExact(TimeUnit.SECONDS.convert(cluster.connectionPoolSettings().keepAliveInterval, TimeUnit.MILLISECONDS));
pipeline.addLast("http-codec", new HttpClientCodec());
@@ -162,7 +167,6 @@ public interface Channelizer extends ChannelHandler {
pipeline.addLast("gremlin-decoder", webSocketGremlinResponseDecoder);
}
-
@Override
public void connected(final Channel ch) {
try {
@@ -176,11 +180,9 @@ public interface Channelizer extends ChannelHandler {
throw new ConnectionException(connectionPool.getHost().getHostUri(),
"Could not complete websocket handshake - ensure that client protocol matches server", f.cause());
}
- }).get(3000, TimeUnit.MILLISECONDS);
+ }).sync();
}
- } catch (ExecutionException ex) {
- throw new RuntimeException(ex.getCause());
- } catch (InterruptedException | TimeoutException ex) {
+ } catch (InterruptedException ex) {
// catching the InterruptedException will reset the interrupted flag. This is intentional.
throw new RuntimeException(new ConnectionException(connectionPool.getHost().getHostUri(),
"Timed out while performing websocket handshake - ensure that client protocol matches server", ex.getCause()));
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java
index 8c399cb..1c173d2 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.driver;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.group.ChannelGroup;
@@ -32,12 +33,15 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
+import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
+import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.ConnectException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
@@ -214,9 +218,8 @@ public class DefaultConnectionPool implements ConnectionPool {
@Override
public Connection prepareConnection() throws TimeoutException, ConnectException {
- if (closeFuture.get() != null) {
+ if (closeFuture.get() != null)
throw new RuntimeException(this + " is closing. Cannot borrow connection.");
- }
// Get a channel, verify handshake is done and then attach it to a connectionPool
final Channel ch = this.channelPool.acquire().syncUninterruptibly().getNow();
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
index 5deba8b..a4e8c1a 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
@@ -288,9 +288,8 @@ final class Settings {
public int maxSize = ConnectionPool.DEFAULT_MAX_POOL_SIZE;
/**
- * Length of time in milliseconds to wait on an idle connection before sending a keep-alive request. This
- * setting is only relevant to {@link Channelizer} implementations that return {@code true} for
- * {@link Channelizer#supportsKeepAlive()}. Set to zero to disable this feature.
+ * Length of time in milliseconds to wait on an idle connection before sending a keep-alive request. Set to
+ * zero to disable this feature.
*/
public long keepAliveInterval = Connection.DEFAULT_KEEP_ALIVE_INTERVAL;
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/SingleRequestConnection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/SingleRequestConnection.java
index 6bb745d..6283fff 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/SingleRequestConnection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/SingleRequestConnection.java
@@ -57,10 +57,9 @@ public class SingleRequestConnection implements Connection {
static final AttributeKey<ResultQueue> RESULT_QUEUE_ATTRIBUTE_KEY = AttributeKey.newInstance("resultQueueFuture");
SingleRequestConnection(final Channel channel, final ConnectionPool pool) {
- /* A channel is attached with a request only when the channel is active. This is the responsibility
- * of channelpool to ensure that the channel attached to this connection is healthy. Something is fishy
- * if this is not true, hence, IllegalState.
- */
+ // A channel is attached with a request only when the channel is active. This is the responsibility
+ // of channelpool to ensure that the channel attached to this connection is healthy. Something is fishy
+ // if this is not true, hence, IllegalState.
if (!channel.isActive()) {
throw new IllegalStateException("Channel " + channel + " is not active.");
}
@@ -150,9 +149,8 @@ public class SingleRequestConnection implements Connection {
*/
@Override
public ChannelPromise write(final RequestMessage requestMessage, final CompletableFuture<ResultSet> resultQueueSetup) {
- if (this.resultFuture != null) {
+ if (this.resultFuture != null)
throw new IllegalStateException("This " + this + " is already in use. Cannot reuse it for request " + requestMessage);
- }
this.resultFuture = resultQueueSetup;
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java
index 67101b3..5710cf9 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java
@@ -29,6 +29,12 @@ public class ConnectionException extends Exception {
private URI uri;
private InetSocketAddress address;
+ public ConnectionException(final URI uri, final String message) {
+ super(message);
+ this.uri = uri;
+ this.address = null;
+ }
+
public ConnectionException(final URI uri, final InetSocketAddress addy, final String message) {
super(message);
this.address = addy;
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 1d0b73f..95f6923 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -32,9 +32,12 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
+import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.URI;
+
/**
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
@@ -42,10 +45,12 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
private static final Logger logger = LoggerFactory.getLogger(WebSocketClientHandler.class);
private ChannelPromise handshakeFuture;
private final ChannelGroup activeChannels;
+ private final URI host;
- public WebSocketClientHandler(final ChannelGroup activeChannels) {
+ public WebSocketClientHandler(final URI host, final ChannelGroup activeChannels) {
super(false);
this.activeChannels = activeChannels;
+ this.host = host;
}
public ChannelFuture handshakeFuture() {
@@ -75,6 +80,9 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
if (event == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
handshakeFuture.setSuccess();
activeChannels.add(ctx.channel());
+ } else if (event == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_TIMEOUT) {
+ handshakeFuture.setFailure(new ConnectionException(host,
+ "Timed out while performing websocket handshake - ensure that client protocol matches server"));
} else if (event instanceof IdleStateEvent) {
final IdleStateEvent e = (IdleStateEvent) event;
if (e.state() == IdleState.READER_IDLE) {
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
index 2d045d1..5f2603d 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
@@ -72,8 +72,8 @@ public class WebSocketClient extends AbstractClient {
uri, WebSocketVersion.V13, null, false,
EmptyHttpHeaders.INSTANCE, 65536);
final WebSocketClientProtocolHandler nettyWsHandler = new WebSocketClientProtocolHandler(
- handshaker, true, false, 9000);
- final WebSocketClientHandler wsHandler = new WebSocketClientHandler(new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
+ handshaker, true, false, 30000);
+ final WebSocketClientHandler wsHandler = new WebSocketClientHandler(uri, new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
final MessageSerializer serializer = new GraphBinaryMessageSerializerV1();
b.channel(NioSocketChannel.class)