You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2019/12/10 12:35:13 UTC

[tinkerpop] 13/18: Refactored WebSocketIdleEventHandler into the WebSocketClientHandler

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch driver-35
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 804d4c9b77d3892a9821e01e98fea886ba5189ae
Author: stephen <sp...@gmail.com>
AuthorDate: Fri Dec 6 14:08:22 2019 -0500

    Refactored WebSocketIdleEventHandler into the WebSocketClientHandler
    
    Not sure why this was done as two separate things really in the first place.
---
 CHANGELOG.asciidoc                                 |  1 +
 docs/src/upgrade/release-3.5.x.asciidoc            |  5 +-
 .../tinkerpop/gremlin/driver/Channelizer.java      |  7 +--
 .../driver/handler/WebSocketClientHandler.java     | 22 +++++++-
 .../driver/handler/WebSocketIdleEventHandler.java  | 58 ----------------------
 .../driver/handler/WebsocketCloseHandler.java      |  2 -
 .../gremlin/driver/simple/WebSocketClient.java     |  5 +-
 ...ClientSingleRequestConnectionIntegrateTest.java |  2 -
 8 files changed, 31 insertions(+), 71 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index b45630b..beb649d 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -32,6 +32,7 @@ This release also includes changes from <<release-3-4-3, 3.4.3>>.
 * Refactored `MapStep` to move its logic to `ScalarMapStep` so that the old behavior could be preserved while allow other implementations to have more flexibility.
 * Modified TinkerGraph to support `null` property values and can be configured to disable that feature.
 * Refactored the Java driver to use one connection per request.
+* Refactored functionality of `WebSocketIdleEventHandler` into the `WebSocketClientHandler`.
 * Modified `null` handling in mutations to be consistent for a new `Vertex` as well as update to an existing one.
 * Upgraded to Apache Commons Configuration2.
 * Renamed `StoreStep` to `AggregateLocalStep`.
diff --git a/docs/src/upgrade/release-3.5.x.asciidoc b/docs/src/upgrade/release-3.5.x.asciidoc
index 3d2297a..3559464 100644
--- a/docs/src/upgrade/release-3.5.x.asciidoc
+++ b/docs/src/upgrade/release-3.5.x.asciidoc
@@ -257,9 +257,10 @@ The following deprecated classes, methods or fields have been removed in this ve
 ** `org.apache.tinkerpop.gremlin.neo4j.structure.trait.NoMultiNoMetaNeo4jTrait`
 ** `org.apache.tinkerpop.gremlin.neo4j.structure.trait.Neo4jTrait`
 
-Certain elements of the API were not or could not be deprecated in prior versions and were simply renamed for this
-release:
+Certain elements of the API were not or could not be deprecated in prior versions and were simply renamed or removed
+for this release:
 
+* `org.apache.tinkerpop.gremlin.driver.handler.WebSocketIdleEventHandler`
 * `org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode#SERVER_ERROR_SCRIPT_EVALUATION` became `SERVER_ERROR_EVALUATION`
 
 See: link:https://issues.apache.org/jira/browse/TINKERPOP-2080[TINKERPOP-2080],
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 723c1c5..4b4d426 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -37,7 +37,6 @@ import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinResponseDecoder;
 import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
 import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
 import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder;
-import org.apache.tinkerpop.gremlin.driver.handler.WebSocketIdleEventHandler;
 import org.apache.tinkerpop.gremlin.driver.handler.WebsocketCloseHandler;
 
 import java.util.Optional;
@@ -173,7 +172,6 @@ public interface Channelizer extends ChannelHandler {
 
         private WebSocketGremlinRequestEncoder webSocketGremlinRequestEncoder;
         private WebSocketGremlinResponseDecoder webSocketGremlinResponseDecoder;
-        private WebSocketIdleEventHandler webSocketIdleEventHandler;
 
         @Override
         public void init(Connection connection) {
@@ -185,7 +183,6 @@ public interface Channelizer extends ChannelHandler {
             super.init(connpool);
             webSocketGremlinRequestEncoder = new WebSocketGremlinRequestEncoder(true, cluster.getSerializer());
             webSocketGremlinResponseDecoder = new WebSocketGremlinResponseDecoder(cluster.getSerializer());
-            webSocketIdleEventHandler = new WebSocketIdleEventHandler(connpool.getActiveChannels());
         }
 
         /**
@@ -217,13 +214,13 @@ public interface Channelizer extends ChannelHandler {
             // TODO: Replace WebSocketClientHandler with Netty's WebSocketClientProtocolHandler
             final WebSocketClientHandler handler = new WebSocketClientHandler(
                     WebSocketClientHandshakerFactory.newHandshaker(
-                            connectionPool.getHost().getHostUri(), WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, maxContentLength));
+                            connectionPool.getHost().getHostUri(), WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, maxContentLength),
+                    connectionPool.getActiveChannels());
 
             int keepAliveInterval = toIntExact(TimeUnit.SECONDS.convert(cluster.connectionPoolSettings().keepAliveInterval, TimeUnit.MILLISECONDS));
             pipeline.addLast("http-codec", new HttpClientCodec());
             pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength));
             pipeline.addLast("netty-idle-state-Handler", new IdleStateHandler(0, keepAliveInterval, 0));
-            pipeline.addLast("ws-idle-handler", webSocketIdleEventHandler);
             pipeline.addLast("ws-client-handler", handler);
             pipeline.addLast("ws-close-handler", new WebsocketCloseHandler());
             pipeline.addLast("gremlin-encoder", webSocketGremlinRequestEncoder);
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 1d6d92f..2cd0f95 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.group.ChannelGroup;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
@@ -31,6 +32,8 @@ import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.util.CharsetUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,9 +45,11 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
     private static final Logger logger = LoggerFactory.getLogger(WebSocketClientHandler.class);
     private final WebSocketClientHandshaker handshaker;
     private ChannelPromise handshakeFuture;
+    private final ChannelGroup activeChannels;
 
-    public WebSocketClientHandler(final WebSocketClientHandshaker handshaker) {
+    public WebSocketClientHandler(final WebSocketClientHandshaker handshaker, final ChannelGroup activeChannels) {
         this.handshaker = handshaker;
+        this.activeChannels = activeChannels;
     }
 
     public ChannelFuture handshakeFuture() {
@@ -62,6 +67,8 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
                 if (!f.isSuccess()) {
                     if (!handshakeFuture.isDone()) handshakeFuture.setFailure(f.cause());
                     ctx.fireExceptionCaught(f.cause());
+                } else {
+                    activeChannels.add(ctx.channel());
                 }
         });
     }
@@ -98,6 +105,19 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
     }
 
     @Override
+    public void userEventTriggered(final ChannelHandlerContext ctx, final Object event) throws Exception {
+        if (event instanceof IdleStateEvent) {
+            final IdleStateEvent e = (IdleStateEvent) event;
+            if (e.state() == IdleState.READER_IDLE) {
+                logger.warn("Server " + ctx.channel() + " has been idle for too long.");
+            } else if (e.state() == IdleState.WRITER_IDLE || e.state() == IdleState.ALL_IDLE) {
+                logger.info("Sending ping frame to the server");
+                ctx.writeAndFlush(new PingWebSocketFrame());
+            }
+        }
+    }
+
+    @Override
     public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
         if (!handshakeFuture.isDone()) handshakeFuture.setFailure(cause);
 
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketIdleEventHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketIdleEventHandler.java
deleted file mode 100644
index 2fb6df3..0000000
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketIdleEventHandler.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.driver.handler;
-
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.group.ChannelGroup;
-import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@ChannelHandler.Sharable
-public class WebSocketIdleEventHandler extends ChannelDuplexHandler {
-    private static final Logger logger = LoggerFactory.getLogger(WebSocketIdleEventHandler.class);
-    private final ChannelGroup activeChannels;
-
-    public WebSocketIdleEventHandler(final ChannelGroup activeChannels) {
-        this.activeChannels = activeChannels;
-    }
-
-    @Override
-    public void channelActive(ChannelHandlerContext ctx) throws Exception {
-        activeChannels.add(ctx.channel());
-        super.channelActive(ctx);
-    }
-
-    @Override
-    public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
-        if (event instanceof IdleStateEvent) {
-            IdleStateEvent e = (IdleStateEvent) event;
-            if (e.state() == IdleState.READER_IDLE) {
-                logger.warn("Server " + ctx.channel() + " has been idle for too long.");
-            } else if (e.state() == IdleState.WRITER_IDLE || e.state() == IdleState.ALL_IDLE) {
-                logger.info("Sending ping frame to the server");
-                ctx.writeAndFlush(new PingWebSocketFrame());
-            }
-        }
-    }
-}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java
index eb16c60..f93ea93 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java
@@ -44,8 +44,6 @@ public class WebsocketCloseHandler extends ChannelOutboundHandlerAdapter {
         }
     }
 
-
-
     @Override
     public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
         ctx.channel().attr(CLOSE_WS_SENT).setIfAbsent(Boolean.FALSE);
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
index b1a47a6..651b1f3 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
@@ -20,7 +20,9 @@ package org.apache.tinkerpop.gremlin.driver.simple;
 
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.handler.codec.http.EmptyHttpHeaders;
+import io.netty.util.concurrent.GlobalEventExecutor;
 import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
 import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
 import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
@@ -67,7 +69,8 @@ public class WebSocketClient extends AbstractClient {
             final WebSocketClientHandler wsHandler =
                     new WebSocketClientHandler(
                             WebSocketClientHandshakerFactory.newHandshaker(
-                                    uri, WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, 65536));
+                                    uri, WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, 65536),
+                            new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
             final MessageSerializer serializer = new GraphBinaryMessageSerializerV1();
             b.channel(NioSocketChannel.class)
                     .handler(new ChannelInitializer<SocketChannel>() {
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientSingleRequestConnectionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientSingleRequestConnectionIntegrateTest.java
index 1dd62f5..72071ad 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientSingleRequestConnectionIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientSingleRequestConnectionIntegrateTest.java
@@ -407,8 +407,6 @@ public class ClientSingleRequestConnectionIntegrateTest extends AbstractGremlinS
     @Test
     public void testAbruptClose() throws InterruptedException {
         final Cluster cluster = this.createClusterWithXNumOfConnection(50);
-
-
         final Client.ClusteredClient client = cluster.connect();
 
         final int requests = 50;