You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/09/21 21:30:03 UTC

[2/4] incubator-tinkerpop git commit: Add a close op for sessions.

Add a close op for sessions.

When a session closes on the client it now sends a message to kill the session which will thus rollback any incomplete transactions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/5f31904a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/5f31904a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/5f31904a

Branch: refs/heads/master
Commit: 5f31904a2d01df0b25bb61b6588eb8f63d7f74fc
Parents: 0e45e8c
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Mon Sep 21 14:02:38 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Mon Sep 21 14:02:38 2015 -0400

----------------------------------------------------------------------
 docs/src/gremlin-applications.asciidoc          |  1 +
 .../apache/tinkerpop/gremlin/driver/Client.java |  8 +-
 .../tinkerpop/gremlin/driver/Connection.java    | 28 +++++-
 .../gremlin/driver/ConnectionPool.java          | 30 +++++--
 .../apache/tinkerpop/gremlin/driver/Tokens.java |  1 +
 .../server/channel/WebSocketChannelizer.java    |  4 +
 .../handler/WsGremlinCloseRequestDecoder.java   | 89 ++++++++++++++++++++
 .../server/op/AbstractEvalOpProcessor.java      | 12 ++-
 .../server/op/session/SessionOpProcessor.java   | 35 +++++++-
 .../server/op/standard/StandardOpProcessor.java |  5 ++
 .../server/GremlinDriverIntegrateTest.java      | 23 ++++-
 11 files changed, 219 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/docs/src/gremlin-applications.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/gremlin-applications.asciidoc b/docs/src/gremlin-applications.asciidoc
index 51c2219..88a1c0d 100644
--- a/docs/src/gremlin-applications.asciidoc
+++ b/docs/src/gremlin-applications.asciidoc
@@ -1003,6 +1003,7 @@ The "session" `OpProcessor` handles requests for the primary function of Gremlin
 !Key !Description
 !`authentication` !A request that contains the response to a server challenge for authentication
 !`eval` !Evaluate a Gremlin script provided as a `String`
+|`close` |Close the specified session and rollback any open transactions.
 !=========================================================
 |=========================================================
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index c2f8738..4076df9 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -300,7 +300,7 @@ public abstract class Client {
             cluster.allHosts().forEach(host -> {
                 try {
                     // hosts that don't initialize connection pools will come up as a dead host
-                    hostConnectionPools.put(host, new ConnectionPool(host, cluster));
+                    hostConnectionPools.put(host, new ConnectionPool(host, this));
 
                     // added a new host to the cluster so let the load-balancer know
                     this.cluster.loadBalancingStrategy().onNew(host);
@@ -412,6 +412,10 @@ public abstract class Client {
             this.sessionId = sessionId;
         }
 
+        String getSessionId() {
+            return sessionId;
+        }
+
         @Override
         public Client rebind(final String graphOrTraversalSourceName){
             throw new UnsupportedOperationException("Sessioned client do no support rebinding");
@@ -445,7 +449,7 @@ public abstract class Client {
                     .stream().filter(Host::isAvailable).collect(Collectors.toList());
             Collections.shuffle(hosts);
             final Host host = hosts.get(0);
-            connectionPool = new ConnectionPool(host, cluster);
+            connectionPool = new ConnectionPool(host, this, Optional.of(1), Optional.of(1));
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 0f2f62a..c7582f9 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -49,6 +49,7 @@ final class Connection {
     private final URI uri;
     private final ConcurrentMap<UUID, ResultQueue> pending = new ConcurrentHashMap<>();
     private final Cluster cluster;
+    private final Client client;
     private final ConnectionPool pool;
 
     public static final int MAX_IN_PROCESS = 4;
@@ -72,9 +73,10 @@ final class Connection {
 
     private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
 
-    public Connection(final URI uri, final ConnectionPool pool, final Cluster cluster, final int maxInProcess) throws ConnectionException {
+    public Connection(final URI uri, final ConnectionPool pool, final int maxInProcess) throws ConnectionException {
         this.uri = uri;
-        this.cluster = cluster;
+        this.cluster = pool.getCluster();
+        this.client = pool.getClient();
         this.pool = pool;
         this.maxInProcess = maxInProcess;
 
@@ -122,6 +124,10 @@ final class Connection {
         return cluster;
     }
 
+    Client getClient() {
+        return client;
+    }
+
     ConcurrentMap<UUID, ResultQueue> getPending() {
         return pending;
     }
@@ -196,6 +202,24 @@ final class Connection {
     }
 
     private void shutdown(final CompletableFuture<Void> future) {
+        if (client instanceof Client.SessionedClient) {
+            final RequestMessage closeMessage = client.buildMessage(RequestMessage.build(Tokens.OPS_CLOSE));
+            final CompletableFuture<ResultSet> closed = new CompletableFuture<>();
+            write(closeMessage, closed);
+
+            try {
+                // make sure we get a response here to validate that things closed as expected.  on error, we'll let
+                // the server try to clean up on its own.  the primary error here should probably be related to
+                // protocol issues which should not be something a user has to fuss with.
+                closed.get();
+            } catch (Exception ex) {
+                final String msg = String.format(
+                    "Encountered an error trying to close connection on %s - force closing - server will close session on shutdown or timeout.",
+                    ((Client.SessionedClient) client).getSessionId());
+                logger.warn(msg, ex);
+            }
+        }
+
         channel.writeAndFlush(new CloseWebSocketFrame());
         final ChannelPromise promise = channel.newPromise();
         promise.addListener(f -> {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index 30cd71f..4c007ec 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -50,6 +51,7 @@ final class ConnectionPool {
 
     public final Host host;
     private final Cluster cluster;
+    private final Client client;
     private final List<Connection> connections;
     private final AtomicInteger open;
     private final Set<Connection> bin = new CopyOnWriteArraySet<>();
@@ -68,14 +70,20 @@ final class ConnectionPool {
     private final Lock waitLock = new ReentrantLock(true);
     private final Condition hasAvailableConnection = waitLock.newCondition();
 
-    public ConnectionPool(final Host host, final Cluster cluster) {
+    public ConnectionPool(final Host host, final Client client) {
+        this(host, client, Optional.empty(), Optional.empty());
+    }
+
+    public ConnectionPool(final Host host, final Client client, final Optional<Integer> overrideMinPoolSize,
+                          final Optional<Integer> overrideMaxPoolSize) {
         this.host = host;
-        this.cluster = cluster;
+        this.client = client;
+        this.cluster = client.cluster;
         poolLabel = String.format("Connection Pool {host=%s}", host);
 
         final Settings.ConnectionPoolSettings settings = settings();
-        this.minPoolSize = settings.minSize;
-        this.maxPoolSize = settings.maxSize;
+        this.minPoolSize = overrideMinPoolSize.orElse(settings.minSize);
+        this.maxPoolSize = overrideMaxPoolSize.orElse(settings.maxSize);
         this.minSimultaneousUsagePerConnection = settings.minSimultaneousUsagePerConnection;
         this.maxSimultaneousUsagePerConnection = settings.maxSimultaneousUsagePerConnection;
         this.minInProcess = settings.minInProcessPerConnection;
@@ -84,7 +92,7 @@ final class ConnectionPool {
 
         try {
             for (int i = 0; i < minPoolSize; i++)
-                l.add(new Connection(host.getHostUri(), this, cluster, settings.maxInProcessPerConnection));
+                l.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
         } catch (ConnectionException ce) {
             // ok if we don't get it initialized here - when a request is attempted in a connection from the
             // pool it will try to create new connections as needed.
@@ -198,6 +206,14 @@ final class ConnectionPool {
         }
     }
 
+    Client getClient() {
+        return client;
+    }
+
+    Cluster getCluster() {
+        return cluster;
+    }
+
     public boolean isClosed() {
         return closeFuture.get() != null;
     }
@@ -281,7 +297,7 @@ final class ConnectionPool {
         }
 
         try {
-            connections.add(new Connection(host.getHostUri(), this, cluster, settings().maxInProcessPerConnection));
+            connections.add(new Connection(host.getHostUri(), this, settings().maxInProcessPerConnection));
         } catch (ConnectionException ce) {
             logger.debug("Connections were under max, but there was an error creating the connection.", ce);
             considerUnavailable();
@@ -378,7 +394,7 @@ final class ConnectionPool {
         logger.debug("Trying to re-establish connection on {}", host);
 
         try {
-            connections.add(new Connection(host.getHostUri(), this, cluster, settings().maxInProcessPerConnection));
+            connections.add(new Connection(host.getHostUri(), this, settings().maxInProcessPerConnection));
             this.open.set(connections.size());
 
             // host is reconnected and a connection is now available

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
index ad0481d..447f42d 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
@@ -30,6 +30,7 @@ public final class Tokens {
     private Tokens() {}
 
     public static final String OPS_SHOW = "show";
+    public static final String OPS_CLOSE = "close";
     public static final String OPS_EVAL = "eval";
     public static final String OPS_IMPORT = "import";
     public static final String OPS_INVALID = "invalid";

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
index 603a239..64e544c 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.server.AbstractChannelizer;
 import org.apache.tinkerpop.gremlin.server.auth.AllowAllAuthenticator;
 import org.apache.tinkerpop.gremlin.server.handler.SaslAuthenticationHandler;
 import org.apache.tinkerpop.gremlin.server.handler.WsGremlinBinaryRequestDecoder;
+import org.apache.tinkerpop.gremlin.server.handler.WsGremlinCloseRequestDecoder;
 import org.apache.tinkerpop.gremlin.server.handler.WsGremlinResponseEncoder;
 import org.apache.tinkerpop.gremlin.server.handler.WsGremlinTextRequestDecoder;
 import io.netty.channel.ChannelPipeline;
@@ -48,6 +49,7 @@ public class WebSocketChannelizer extends AbstractChannelizer {
     private WsGremlinResponseEncoder wsGremlinResponseEncoder;
     private WsGremlinTextRequestDecoder wsGremlinTextRequestDecoder;
     private WsGremlinBinaryRequestDecoder wsGremlinBinaryRequestDecoder;
+    private WsGremlinCloseRequestDecoder wsGremlinCloseRequestDecoder;
     private SaslAuthenticationHandler authenticationHandler;
 
     @Override
@@ -57,6 +59,7 @@ public class WebSocketChannelizer extends AbstractChannelizer {
         wsGremlinResponseEncoder = new WsGremlinResponseEncoder();
         wsGremlinTextRequestDecoder = new WsGremlinTextRequestDecoder(serializers);
         wsGremlinBinaryRequestDecoder = new WsGremlinBinaryRequestDecoder(serializers);
+        wsGremlinCloseRequestDecoder = new WsGremlinCloseRequestDecoder(serializers);
 
         // configure authentication - null means don't bother to add authentication to the pipeline
         if (authenticator != null)
@@ -94,6 +97,7 @@ public class WebSocketChannelizer extends AbstractChannelizer {
         pipeline.addLast("response-encoder", wsGremlinResponseEncoder);
         pipeline.addLast("request-text-decoder", wsGremlinTextRequestDecoder);
         pipeline.addLast("request-binary-decoder", wsGremlinBinaryRequestDecoder);
+        pipeline.addLast("request-close-decoder", wsGremlinCloseRequestDecoder);
 
         if (logger.isDebugEnabled())
             pipeline.addLast(new LoggingHandler("log-aggregator-encoder", LogLevel.DEBUG));

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinCloseRequestDecoder.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinCloseRequestDecoder.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinCloseRequestDecoder.java
new file mode 100644
index 0000000..fc6f3cf
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinCloseRequestDecoder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.driver.ser.SerializationException;
+import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Handles close requests over web sockets.  If the request is session-based then the close operation will not only close
+ * the channel but also close the session and rollback uncommitted transactions.
+ *
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+@ChannelHandler.Sharable
+public class WsGremlinCloseRequestDecoder extends MessageToMessageDecoder<CloseWebSocketFrame> {
+    private static final Logger logger = LoggerFactory.getLogger(WsGremlinCloseRequestDecoder.class);
+
+    private static final Charset UTF8 = Charset.forName("UTF-8");
+    private final Map<String, MessageSerializer> serializers;
+
+    public WsGremlinCloseRequestDecoder(final Map<String, MessageSerializer> serializers) {
+        this.serializers = serializers;
+    }
+
+    @Override
+    protected void decode(final ChannelHandlerContext channelHandlerContext, final CloseWebSocketFrame frame, final List<Object> objects) throws Exception {
+        final ByteBuf messageBytes = frame.content();
+        final byte len = messageBytes.readByte();
+        if (len <= 0) {
+            objects.add(RequestMessage.INVALID);
+            return;
+        }
+
+        final ByteBuf contentTypeBytes = channelHandlerContext.alloc().buffer(len);
+        try {
+            messageBytes.readBytes(contentTypeBytes);
+            final String contentType = contentTypeBytes.toString(UTF8);
+            final MessageSerializer serializer = select(contentType, Serializers.DEFAULT_REQUEST_SERIALIZER);
+
+            channelHandlerContext.channel().attr(StateKey.SERIALIZER).set(serializer);
+            channelHandlerContext.channel().attr(StateKey.USE_BINARY).set(true);
+            try {
+                objects.add(serializer.deserializeRequest(messageBytes.discardReadBytes()));
+            } catch (SerializationException se) {
+                objects.add(RequestMessage.INVALID);
+            }
+        } finally {
+            contentTypeBytes.release();
+        }
+    }
+
+    private MessageSerializer select(final String mimeType, final MessageSerializer defaultSerializer) {
+        if (logger.isWarnEnabled() && !serializers.containsKey(mimeType))
+            logger.warn("Gremlin Server is not configured with a serializer for the requested mime type [{}] - using {} by default",
+                    mimeType, defaultSerializer.getClass().getName());
+
+        return serializers.getOrDefault(mimeType, defaultSerializer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
index bfeb120..cd502e3 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
@@ -82,6 +82,12 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor {
      */
     public abstract ThrowingConsumer<Context> getEvalOp();
 
+    /**
+     * A sub-class may have additional "ops" that it will service.  Calls to {@link #select(Context)} that are not
+     * handled will be passed to this method to see if the sub-class can service the requested op code.
+     */
+    public abstract Optional<ThrowingConsumer<Context>> selectOther(final RequestMessage requestMessage) throws OpProcessorException;
+
     @Override
     public ThrowingConsumer<Context> select(final Context ctx) throws OpProcessorException {
         final RequestMessage message = ctx.getRequestMessage();
@@ -96,8 +102,10 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor {
                 final String msgInvalid = String.format("Message could not be parsed.  Check the format of the request. [%s]", message);
                 throw new OpProcessorException(msgInvalid, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).result(msgInvalid).create());
             default:
-                final String msgDefault = String.format("Message with op code [%s] is not recognized.", message.getOp());
-                throw new OpProcessorException(msgDefault, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).result(msgDefault).create());
+                op = selectOther(message).orElseThrow(() -> {
+                    final String msgDefault = String.format("Message with op code [%s] is not recognized.", message.getOp());
+                    return new OpProcessorException(msgDefault, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).result(msgDefault).create());
+                });
         }
 
         return op;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
index bdeb02a..8a31fe9 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
@@ -43,8 +43,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import static com.codahale.metrics.MetricRegistry.name;
 
 /**
- * Simple {@link org.apache.tinkerpop.gremlin.server.OpProcessor} implementation that handles {@code ScriptEngine}
- * script evaluation in the context of a session.
+ * Simple {@link org.apache.tinkerpop.gremlin.server.OpProcessor} implementation that handles
+ * {@code ScriptEngine} script evaluation in the context of a session. Note that this processor will
+ * also take a "close" op to kill the session and rollback any incomplete transactions.
  *
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
@@ -89,6 +90,34 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor {
         return OP_PROCESSOR_NAME;
     }
 
+    /**
+     * Session based requests accept a "close" operator in addition to "eval".  A close will trigger the session to be
+     * killed and any uncommitted transaction to be rolled-back.
+     */
+    @Override
+    public Optional<ThrowingConsumer<Context>> selectOther(final RequestMessage requestMessage)  throws OpProcessorException {
+        if (requestMessage.getOp().equals(Tokens.OPS_CLOSE)) {
+            // this must be an in-session request
+            if (!requestMessage.optionalArgs(Tokens.ARGS_SESSION).isPresent()) {
+                final String msg = String.format("A message with an [%s] op code requires a [%s] argument", Tokens.OPS_CLOSE, Tokens.ARGS_SESSION);
+                throw new OpProcessorException(msg, ResponseMessage.build(requestMessage).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).result(msg).create());
+            }
+
+            return Optional.of(ctx -> {
+                // validate the session is present and then remove it if it is.
+                final Session sessionToClose = sessions.get(requestMessage.getArgs().get(Tokens.ARGS_SESSION).toString());
+                if (null == sessionToClose) {
+                    final String msg = String.format("There was no session named %s to close", requestMessage.getArgs().get(Tokens.ARGS_SESSION).toString());
+                    throw new OpProcessorException(msg, ResponseMessage.build(requestMessage).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).result(msg).create());
+                }
+
+                sessionToClose.kill();
+            });
+        } else {
+            return Optional.empty();
+        }
+    }
+
     @Override
     public ThrowingConsumer<Context> getEvalOp() {
         return this::evalOp;
@@ -99,7 +128,7 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor {
         super.validateEvalMessage(message);
 
         if (!message.optionalArgs(Tokens.ARGS_SESSION).isPresent()) {
-            final String msg = String.format("A message with an [%s] op code requires a [%s] argument.", Tokens.OPS_EVAL, Tokens.ARGS_SESSION);
+            final String msg = String.format("A message with an [%s] op code requires a [%s] argument", Tokens.OPS_EVAL, Tokens.ARGS_SESSION);
             throw new OpProcessorException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).result(msg).create());
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java
index 42cf649..017594f 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java
@@ -63,6 +63,11 @@ public class StandardOpProcessor extends AbstractEvalOpProcessor {
     }
 
     @Override
+    public Optional<ThrowingConsumer<Context>> selectOther(final RequestMessage requestMessage)  throws OpProcessorException {
+        return Optional.empty();
+    }
+
+    @Override
     public void close() throws Exception {
         // do nothing = no resources to release
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 0057d6d..98c9d61 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
 import org.apache.tinkerpop.gremlin.driver.Result;
 import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
 import org.apache.tinkerpop.gremlin.driver.ser.JsonBuilderGryoSerializer;
@@ -507,6 +508,27 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
     }
 
     @Test
+    public void shouldCloseSession() throws Exception {
+        final Cluster cluster = Cluster.build().create();
+        final Client client = cluster.connect(name.getMethodName());
+
+        final ResultSet results1 = client.submit("x = [1,2,3,4,5,6,7,8,9]");
+        assertEquals(9, results1.all().get().size());
+        final ResultSet results2 = client.submit("x[0]+1");
+        assertEquals(2, results2.all().get().get(0).getInt());
+
+        client.close();
+
+        try {
+            client.submit("x[0]+1");
+            fail("Should have thrown an exception because the connection is closed");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(ConnectionException.class));
+        }
+    }
+
+    @Test
     public void shouldExecuteScriptInSessionAssumingDefaultedImports() throws Exception {
         final Cluster cluster = Cluster.build().create();
         final Client client = cluster.connect(name.getMethodName());
@@ -682,5 +704,4 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
 
         cluster.close();
     }
-
 }