You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/09/21 21:30:04 UTC
[3/4] incubator-tinkerpop git commit: Allow client side channelizer
to send a final implementation specific close message.
Allow client side channelizer to send a final implementation specific close message.
The CloseWebSocketFrame gets sent by the channelizer now.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/c8650ceb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/c8650ceb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/c8650ceb
Branch: refs/heads/master
Commit: c8650ceb3deb3d9aac859a9eaff0eea2cb48e7e5
Parents: 5f31904
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Mon Sep 21 14:33:31 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Mon Sep 21 14:33:31 2015 -0400
----------------------------------------------------------------------
.../tinkerpop/gremlin/driver/Channelizer.java | 23 ++++++++++++++++++++
.../tinkerpop/gremlin/driver/Connection.java | 7 ++++--
2 files changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c8650ceb/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index ccf0f9b..bfd223f 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -18,6 +18,8 @@
*/
package org.apache.tinkerpop.gremlin.driver;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinRequestEncoder;
@@ -58,6 +60,14 @@ public interface Channelizer extends ChannelHandler {
public void init(final Connection connection);
/**
+ * Called on {@link Connection#close()} to perform an {@code Channelizer} specific functions. Note that the
+ * {@link Connection} already calls {@code Channel.close()} so there is no need to call that method here.
+ * An implementation will typically use this method to send a {@code Channelizer} specific message to the
+ * server to notify of shutdown coming from the client side (e.g. a "close" websocket frame).
+ */
+ public void close(final Channel channel);
+
+ /**
* Called after the channel connects. The {@code Channelizer} may need to perform some functions, such as a
* handshake.
*/
@@ -88,6 +98,11 @@ public interface Channelizer extends ChannelHandler {
}
@Override
+ public void close(final Channel channel) {
+ // do nothing
+ }
+
+ @Override
public void init(final Connection connection) {
this.connection = connection;
this.cluster = connection.getCluster();
@@ -142,6 +157,14 @@ public interface Channelizer extends ChannelHandler {
webSocketGremlinResponseDecoder = new WebSocketGremlinResponseDecoder(cluster.getSerializer());
}
+ /**
+ * Sends a {@code CloseWebSocketFrame} to the server for the specified channel.
+ */
+ @Override
+ public void close(final Channel channel) {
+ channel.writeAndFlush(new CloseWebSocketFrame());
+ }
+
@Override
public boolean supportsSsl() {
final String scheme = connection.getUri().getScheme();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c8650ceb/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index c7582f9..f31901e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -71,6 +71,8 @@ final class Connection {
private final String connectionLabel;
+ private final Channelizer channelizer;
+
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
public Connection(final URI uri, final ConnectionPool pool, final int maxInProcess) throws ConnectionException {
@@ -85,7 +87,7 @@ final class Connection {
if (cluster.isClosing()) throw new IllegalStateException("Cannot open a connection while the cluster after close() is called");
final Bootstrap b = this.cluster.getFactory().createBootstrap();
- final Channelizer channelizer = new Channelizer.WebSocketChannelizer();
+ channelizer = new Channelizer.WebSocketChannelizer();
channelizer.init(this);
b.channel(NioSocketChannel.class).handler(channelizer);
@@ -203,6 +205,7 @@ final class Connection {
private void shutdown(final CompletableFuture<Void> future) {
if (client instanceof Client.SessionedClient) {
+ // maybe this should be delegated back to the Client implementation???
final RequestMessage closeMessage = client.buildMessage(RequestMessage.build(Tokens.OPS_CLOSE));
final CompletableFuture<ResultSet> closed = new CompletableFuture<>();
write(closeMessage, closed);
@@ -220,7 +223,7 @@ final class Connection {
}
}
- channel.writeAndFlush(new CloseWebSocketFrame());
+ channelizer.close(channel);
final ChannelPromise promise = channel.newPromise();
promise.addListener(f -> {
if (f.cause() != null)