You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/09/21 21:30:04 UTC

[3/4] incubator-tinkerpop git commit: Allow client side channelizer to send a final implementation specific close message.

Allow client side channelizer to send a final implementation specific close message.

The CloseWebSocketFrame gets sent by the channelizer now.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/c8650ceb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/c8650ceb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/c8650ceb

Branch: refs/heads/master
Commit: c8650ceb3deb3d9aac859a9eaff0eea2cb48e7e5
Parents: 5f31904
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Mon Sep 21 14:33:31 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Mon Sep 21 14:33:31 2015 -0400

----------------------------------------------------------------------
 .../tinkerpop/gremlin/driver/Channelizer.java   | 23 ++++++++++++++++++++
 .../tinkerpop/gremlin/driver/Connection.java    |  7 ++++--
 2 files changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c8650ceb/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index ccf0f9b..bfd223f 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -18,6 +18,8 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
+import io.netty.channel.Channel;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinRequestEncoder;
@@ -58,6 +60,14 @@ public interface Channelizer extends ChannelHandler {
     public void init(final Connection connection);
 
     /**
+     * Called on {@link Connection#close()} to perform an {@code Channelizer} specific functions.  Note that the
+     * {@link Connection} already calls {@code Channel.close()} so there is no need to call that method here.
+     * An implementation will typically use this method to send a {@code Channelizer} specific message to the
+     * server to notify of shutdown coming from the client side (e.g. a "close" websocket frame).
+     */
+    public void close(final Channel channel);
+
+    /**
      * Called after the channel connects. The {@code Channelizer} may need to perform some functions, such as a
      * handshake.
      */
@@ -88,6 +98,11 @@ public interface Channelizer extends ChannelHandler {
         }
 
         @Override
+        public void close(final Channel channel) {
+            // do nothing
+        }
+
+        @Override
         public void init(final Connection connection) {
             this.connection = connection;
             this.cluster = connection.getCluster();
@@ -142,6 +157,14 @@ public interface Channelizer extends ChannelHandler {
             webSocketGremlinResponseDecoder = new WebSocketGremlinResponseDecoder(cluster.getSerializer());
         }
 
+        /**
+         * Sends a {@code CloseWebSocketFrame} to the server for the specified channel.
+         */
+        @Override
+        public void close(final Channel channel) {
+            channel.writeAndFlush(new CloseWebSocketFrame());
+        }
+
         @Override
         public boolean supportsSsl() {
             final String scheme = connection.getUri().getScheme();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c8650ceb/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index c7582f9..f31901e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -71,6 +71,8 @@ final class Connection {
 
     private final String connectionLabel;
 
+    private final Channelizer channelizer;
+
     private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
 
     public Connection(final URI uri, final ConnectionPool pool, final int maxInProcess) throws ConnectionException {
@@ -85,7 +87,7 @@ final class Connection {
         if (cluster.isClosing()) throw new IllegalStateException("Cannot open a connection while the cluster after close() is called");
 
         final Bootstrap b = this.cluster.getFactory().createBootstrap();
-        final Channelizer channelizer = new Channelizer.WebSocketChannelizer();
+        channelizer = new Channelizer.WebSocketChannelizer();
         channelizer.init(this);
         b.channel(NioSocketChannel.class).handler(channelizer);
 
@@ -203,6 +205,7 @@ final class Connection {
 
     private void shutdown(final CompletableFuture<Void> future) {
         if (client instanceof Client.SessionedClient) {
+            // maybe this should be delegated back to the Client implementation???
             final RequestMessage closeMessage = client.buildMessage(RequestMessage.build(Tokens.OPS_CLOSE));
             final CompletableFuture<ResultSet> closed = new CompletableFuture<>();
             write(closeMessage, closed);
@@ -220,7 +223,7 @@ final class Connection {
             }
         }
 
-        channel.writeAndFlush(new CloseWebSocketFrame());
+        channelizer.close(channel);
         final ChannelPromise promise = channel.newPromise();
         promise.addListener(f -> {
             if (f.cause() != null)