You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2019/12/10 12:35:17 UTC
[tinkerpop] 17/18: Limited some listener creation for handshakes
This is an automated email from the ASF dual-hosted git repository.
spmallette pushed a commit to branch driver-35
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 390f9d5743080de1f2f2aaa031b23e967c9b09da
Author: stephen <sp...@gmail.com>
AuthorDate: Mon Dec 9 12:16:35 2019 -0500
Limited some listener creation for handshakes
The handshake should only happen once so calls to Channelizer.connect() might be redundant if the handshake already occurred.
---
.../tinkerpop/gremlin/driver/Channelizer.java | 22 +++++++++++++---------
.../gremlin/driver/DefaultConnectionPool.java | 2 --
.../driver/handler/WebSocketClientHandler.java | 19 +++++++++++--------
3 files changed, 24 insertions(+), 19 deletions(-)
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 0ebfbfa..fc10371 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -19,6 +19,7 @@
package org.apache.tinkerpop.gremlin.driver;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
@@ -165,15 +166,18 @@ public interface Channelizer extends ChannelHandler {
@Override
public void connected(final Channel ch) {
try {
- // block for a few seconds - if the handshake takes longer than there's gotta be issues with that
- // server. more than likely, SSL is enabled on the server, but the client forgot to enable it or
- // perhaps the server is not configured for websockets.
- ((WebSocketClientHandler)(ch.pipeline().get("ws-client-handler"))).handshakeFuture().addListener( f -> {
- if (!f.isSuccess()) {
- throw new ConnectionException(connectionPool.getHost().getHostUri(),
- "Could not complete websocket handshake - ensure that client protocol matches server", f.cause());
- }
- }).get(1500, TimeUnit.MILLISECONDS);
+ // be sure the handshake is done - if the handshake takes longer than the specified time there's
+ // gotta be issues with that server. a common problem where this comes up: SSL is enabled on the
+ // server, but the client forgot to enable it or perhaps the server is not configured for websockets.
+ final ChannelFuture handshakeFuture = ((WebSocketClientHandler)(ch.pipeline().get("ws-client-handler"))).handshakeFuture();
+ if (!handshakeFuture.isDone()) {
+ handshakeFuture.addListener(f -> {
+ if (!f.isSuccess()) {
+ throw new ConnectionException(connectionPool.getHost().getHostUri(),
+ "Could not complete websocket handshake - ensure that client protocol matches server", f.cause());
+ }
+ }).get(3000, TimeUnit.MILLISECONDS);
+ }
} catch (ExecutionException ex) {
throw new RuntimeException(ex.getCause());
} catch (InterruptedException | TimeoutException ex) {
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java
index 7bda3d5..8c399cb 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java
@@ -220,8 +220,6 @@ public class DefaultConnectionPool implements ConnectionPool {
// Get a channel, verify handshake is done and then attach it to a connectionPool
final Channel ch = this.channelPool.acquire().syncUninterruptibly().getNow();
-
- // TODO: This call is un-necessary on every channel acquire, since handshake is done once.
channelizer.connected(ch);
return new SingleRequestConnection(ch, this);
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index aded787..1d0b73f 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -31,6 +31,7 @@ import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +44,7 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
private final ChannelGroup activeChannels;
public WebSocketClientHandler(final ChannelGroup activeChannels) {
+ super(false);
this.activeChannels = activeChannels;
}
@@ -58,18 +60,22 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
@Override
protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) throws Exception {
final WebSocketFrame frame = (WebSocketFrame) msg;
- if (frame instanceof TextWebSocketFrame) {
- ctx.fireChannelRead(frame.retain(2));
- } else if (frame instanceof BinaryWebSocketFrame) {
- ctx.fireChannelRead(frame.retain(2));
+ if (frame instanceof TextWebSocketFrame || frame instanceof BinaryWebSocketFrame) {
+ ctx.fireChannelRead(frame.retain());
} else if (frame instanceof PongWebSocketFrame) {
logger.debug("Received response from keep-alive request");
+ ReferenceCountUtil.release(frame);
+ } else {
+ throw new IllegalStateException("Unexpected message of " + msg.getClass().getSimpleName() + ": " + msg);
}
}
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, final Object event) throws Exception {
- if (event instanceof IdleStateEvent) {
+ if (event == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
+ handshakeFuture.setSuccess();
+ activeChannels.add(ctx.channel());
+ } else if (event instanceof IdleStateEvent) {
final IdleStateEvent e = (IdleStateEvent) event;
if (e.state() == IdleState.READER_IDLE) {
logger.warn("Server " + ctx.channel() + " has been idle for too long.");
@@ -77,9 +83,6 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
logger.info("Sending ping frame to the server");
ctx.writeAndFlush(new PingWebSocketFrame());
}
- } else if (event == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
- handshakeFuture.setSuccess();
- activeChannels.add(ctx.channel());
}
}