You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/09/21 21:30:03 UTC
[2/4] incubator-tinkerpop git commit: Add a close op for sessions.
Add a close op for sessions.
When a session closes on the client it now sends a message to kill the session which will thus rollback any incomplete transactions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/5f31904a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/5f31904a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/5f31904a
Branch: refs/heads/master
Commit: 5f31904a2d01df0b25bb61b6588eb8f63d7f74fc
Parents: 0e45e8c
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Mon Sep 21 14:02:38 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Mon Sep 21 14:02:38 2015 -0400
----------------------------------------------------------------------
docs/src/gremlin-applications.asciidoc | 1 +
.../apache/tinkerpop/gremlin/driver/Client.java | 8 +-
.../tinkerpop/gremlin/driver/Connection.java | 28 +++++-
.../gremlin/driver/ConnectionPool.java | 30 +++++--
.../apache/tinkerpop/gremlin/driver/Tokens.java | 1 +
.../server/channel/WebSocketChannelizer.java | 4 +
.../handler/WsGremlinCloseRequestDecoder.java | 89 ++++++++++++++++++++
.../server/op/AbstractEvalOpProcessor.java | 12 ++-
.../server/op/session/SessionOpProcessor.java | 35 +++++++-
.../server/op/standard/StandardOpProcessor.java | 5 ++
.../server/GremlinDriverIntegrateTest.java | 23 ++++-
11 files changed, 219 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/docs/src/gremlin-applications.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/gremlin-applications.asciidoc b/docs/src/gremlin-applications.asciidoc
index 51c2219..88a1c0d 100644
--- a/docs/src/gremlin-applications.asciidoc
+++ b/docs/src/gremlin-applications.asciidoc
@@ -1003,6 +1003,7 @@ The "session" `OpProcessor` handles requests for the primary function of Gremlin
!Key !Description
!`authentication` !A request that contains the response to a server challenge for authentication
!`eval` !Evaluate a Gremlin script provided as a `String`
+|`close` |Close the specified session and rollback any open transactions.
!=========================================================
|=========================================================
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index c2f8738..4076df9 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -300,7 +300,7 @@ public abstract class Client {
cluster.allHosts().forEach(host -> {
try {
// hosts that don't initialize connection pools will come up as a dead host
- hostConnectionPools.put(host, new ConnectionPool(host, cluster));
+ hostConnectionPools.put(host, new ConnectionPool(host, this));
// added a new host to the cluster so let the load-balancer know
this.cluster.loadBalancingStrategy().onNew(host);
@@ -412,6 +412,10 @@ public abstract class Client {
this.sessionId = sessionId;
}
+ String getSessionId() {
+ return sessionId;
+ }
+
@Override
public Client rebind(final String graphOrTraversalSourceName){
throw new UnsupportedOperationException("Sessioned client do no support rebinding");
@@ -445,7 +449,7 @@ public abstract class Client {
.stream().filter(Host::isAvailable).collect(Collectors.toList());
Collections.shuffle(hosts);
final Host host = hosts.get(0);
- connectionPool = new ConnectionPool(host, cluster);
+ connectionPool = new ConnectionPool(host, this, Optional.of(1), Optional.of(1));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 0f2f62a..c7582f9 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -49,6 +49,7 @@ final class Connection {
private final URI uri;
private final ConcurrentMap<UUID, ResultQueue> pending = new ConcurrentHashMap<>();
private final Cluster cluster;
+ private final Client client;
private final ConnectionPool pool;
public static final int MAX_IN_PROCESS = 4;
@@ -72,9 +73,10 @@ final class Connection {
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
- public Connection(final URI uri, final ConnectionPool pool, final Cluster cluster, final int maxInProcess) throws ConnectionException {
+ public Connection(final URI uri, final ConnectionPool pool, final int maxInProcess) throws ConnectionException {
this.uri = uri;
- this.cluster = cluster;
+ this.cluster = pool.getCluster();
+ this.client = pool.getClient();
this.pool = pool;
this.maxInProcess = maxInProcess;
@@ -122,6 +124,10 @@ final class Connection {
return cluster;
}
+ Client getClient() {
+ return client;
+ }
+
ConcurrentMap<UUID, ResultQueue> getPending() {
return pending;
}
@@ -196,6 +202,24 @@ final class Connection {
}
private void shutdown(final CompletableFuture<Void> future) {
+ if (client instanceof Client.SessionedClient) {
+ final RequestMessage closeMessage = client.buildMessage(RequestMessage.build(Tokens.OPS_CLOSE));
+ final CompletableFuture<ResultSet> closed = new CompletableFuture<>();
+ write(closeMessage, closed);
+
+ try {
+ // make sure we get a response here to validate that things closed as expected. on error, we'll let
+ // the server try to clean up on its own. the primary error here should probably be related to
+ // protocol issues which should not be something a user has to fuss with.
+ closed.get();
+ } catch (Exception ex) {
+ final String msg = String.format(
+ "Encountered an error trying to close connection on %s - force closing - server will close session on shutdown or timeout.",
+ ((Client.SessionedClient) client).getSessionId());
+ logger.warn(msg, ex);
+ }
+ }
+
channel.writeAndFlush(new CloseWebSocketFrame());
final ChannelPromise promise = channel.newPromise();
promise.addListener(f -> {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index 30cd71f..4c007ec 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -50,6 +51,7 @@ final class ConnectionPool {
public final Host host;
private final Cluster cluster;
+ private final Client client;
private final List<Connection> connections;
private final AtomicInteger open;
private final Set<Connection> bin = new CopyOnWriteArraySet<>();
@@ -68,14 +70,20 @@ final class ConnectionPool {
private final Lock waitLock = new ReentrantLock(true);
private final Condition hasAvailableConnection = waitLock.newCondition();
- public ConnectionPool(final Host host, final Cluster cluster) {
+ public ConnectionPool(final Host host, final Client client) {
+ this(host, client, Optional.empty(), Optional.empty());
+ }
+
+ public ConnectionPool(final Host host, final Client client, final Optional<Integer> overrideMinPoolSize,
+ final Optional<Integer> overrideMaxPoolSize) {
this.host = host;
- this.cluster = cluster;
+ this.client = client;
+ this.cluster = client.cluster;
poolLabel = String.format("Connection Pool {host=%s}", host);
final Settings.ConnectionPoolSettings settings = settings();
- this.minPoolSize = settings.minSize;
- this.maxPoolSize = settings.maxSize;
+ this.minPoolSize = overrideMinPoolSize.orElse(settings.minSize);
+ this.maxPoolSize = overrideMaxPoolSize.orElse(settings.maxSize);
this.minSimultaneousUsagePerConnection = settings.minSimultaneousUsagePerConnection;
this.maxSimultaneousUsagePerConnection = settings.maxSimultaneousUsagePerConnection;
this.minInProcess = settings.minInProcessPerConnection;
@@ -84,7 +92,7 @@ final class ConnectionPool {
try {
for (int i = 0; i < minPoolSize; i++)
- l.add(new Connection(host.getHostUri(), this, cluster, settings.maxInProcessPerConnection));
+ l.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
} catch (ConnectionException ce) {
// ok if we don't get it initialized here - when a request is attempted in a connection from the
// pool it will try to create new connections as needed.
@@ -198,6 +206,14 @@ final class ConnectionPool {
}
}
+ Client getClient() {
+ return client;
+ }
+
+ Cluster getCluster() {
+ return cluster;
+ }
+
public boolean isClosed() {
return closeFuture.get() != null;
}
@@ -281,7 +297,7 @@ final class ConnectionPool {
}
try {
- connections.add(new Connection(host.getHostUri(), this, cluster, settings().maxInProcessPerConnection));
+ connections.add(new Connection(host.getHostUri(), this, settings().maxInProcessPerConnection));
} catch (ConnectionException ce) {
logger.debug("Connections were under max, but there was an error creating the connection.", ce);
considerUnavailable();
@@ -378,7 +394,7 @@ final class ConnectionPool {
logger.debug("Trying to re-establish connection on {}", host);
try {
- connections.add(new Connection(host.getHostUri(), this, cluster, settings().maxInProcessPerConnection));
+ connections.add(new Connection(host.getHostUri(), this, settings().maxInProcessPerConnection));
this.open.set(connections.size());
// host is reconnected and a connection is now available
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
index ad0481d..447f42d 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
@@ -30,6 +30,7 @@ public final class Tokens {
private Tokens() {}
public static final String OPS_SHOW = "show";
+ public static final String OPS_CLOSE = "close";
public static final String OPS_EVAL = "eval";
public static final String OPS_IMPORT = "import";
public static final String OPS_INVALID = "invalid";
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
index 603a239..64e544c 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.server.AbstractChannelizer;
import org.apache.tinkerpop.gremlin.server.auth.AllowAllAuthenticator;
import org.apache.tinkerpop.gremlin.server.handler.SaslAuthenticationHandler;
import org.apache.tinkerpop.gremlin.server.handler.WsGremlinBinaryRequestDecoder;
+import org.apache.tinkerpop.gremlin.server.handler.WsGremlinCloseRequestDecoder;
import org.apache.tinkerpop.gremlin.server.handler.WsGremlinResponseEncoder;
import org.apache.tinkerpop.gremlin.server.handler.WsGremlinTextRequestDecoder;
import io.netty.channel.ChannelPipeline;
@@ -48,6 +49,7 @@ public class WebSocketChannelizer extends AbstractChannelizer {
private WsGremlinResponseEncoder wsGremlinResponseEncoder;
private WsGremlinTextRequestDecoder wsGremlinTextRequestDecoder;
private WsGremlinBinaryRequestDecoder wsGremlinBinaryRequestDecoder;
+ private WsGremlinCloseRequestDecoder wsGremlinCloseRequestDecoder;
private SaslAuthenticationHandler authenticationHandler;
@Override
@@ -57,6 +59,7 @@ public class WebSocketChannelizer extends AbstractChannelizer {
wsGremlinResponseEncoder = new WsGremlinResponseEncoder();
wsGremlinTextRequestDecoder = new WsGremlinTextRequestDecoder(serializers);
wsGremlinBinaryRequestDecoder = new WsGremlinBinaryRequestDecoder(serializers);
+ wsGremlinCloseRequestDecoder = new WsGremlinCloseRequestDecoder(serializers);
// configure authentication - null means don't bother to add authentication to the pipeline
if (authenticator != null)
@@ -94,6 +97,7 @@ public class WebSocketChannelizer extends AbstractChannelizer {
pipeline.addLast("response-encoder", wsGremlinResponseEncoder);
pipeline.addLast("request-text-decoder", wsGremlinTextRequestDecoder);
pipeline.addLast("request-binary-decoder", wsGremlinBinaryRequestDecoder);
+ pipeline.addLast("request-close-decoder", wsGremlinCloseRequestDecoder);
if (logger.isDebugEnabled())
pipeline.addLast(new LoggingHandler("log-aggregator-encoder", LogLevel.DEBUG));
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinCloseRequestDecoder.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinCloseRequestDecoder.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinCloseRequestDecoder.java
new file mode 100644
index 0000000..fc6f3cf
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinCloseRequestDecoder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.driver.ser.SerializationException;
+import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Handles close requests over web sockets. If the request is session-based then the close operation will not only close
+ * the channel but also close the session and rollback uncommitted transactions.
+ *
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+@ChannelHandler.Sharable
+public class WsGremlinCloseRequestDecoder extends MessageToMessageDecoder<CloseWebSocketFrame> {
+ private static final Logger logger = LoggerFactory.getLogger(WsGremlinCloseRequestDecoder.class);
+
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+ private final Map<String, MessageSerializer> serializers;
+
+ public WsGremlinCloseRequestDecoder(final Map<String, MessageSerializer> serializers) {
+ this.serializers = serializers;
+ }
+
+ @Override
+ protected void decode(final ChannelHandlerContext channelHandlerContext, final CloseWebSocketFrame frame, final List<Object> objects) throws Exception {
+ final ByteBuf messageBytes = frame.content();
+ final byte len = messageBytes.readByte();
+ if (len <= 0) {
+ objects.add(RequestMessage.INVALID);
+ return;
+ }
+
+ final ByteBuf contentTypeBytes = channelHandlerContext.alloc().buffer(len);
+ try {
+ messageBytes.readBytes(contentTypeBytes);
+ final String contentType = contentTypeBytes.toString(UTF8);
+ final MessageSerializer serializer = select(contentType, Serializers.DEFAULT_REQUEST_SERIALIZER);
+
+ channelHandlerContext.channel().attr(StateKey.SERIALIZER).set(serializer);
+ channelHandlerContext.channel().attr(StateKey.USE_BINARY).set(true);
+ try {
+ objects.add(serializer.deserializeRequest(messageBytes.discardReadBytes()));
+ } catch (SerializationException se) {
+ objects.add(RequestMessage.INVALID);
+ }
+ } finally {
+ contentTypeBytes.release();
+ }
+ }
+
+ private MessageSerializer select(final String mimeType, final MessageSerializer defaultSerializer) {
+ if (logger.isWarnEnabled() && !serializers.containsKey(mimeType))
+ logger.warn("Gremlin Server is not configured with a serializer for the requested mime type [{}] - using {} by default",
+ mimeType, defaultSerializer.getClass().getName());
+
+ return serializers.getOrDefault(mimeType, defaultSerializer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
index bfeb120..cd502e3 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
@@ -82,6 +82,12 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor {
*/
public abstract ThrowingConsumer<Context> getEvalOp();
+ /**
+ * A sub-class may have additional "ops" that it will service. Calls to {@link #select(Context)} that are not
+ * handled will be passed to this method to see if the sub-class can service the requested op code.
+ */
+ public abstract Optional<ThrowingConsumer<Context>> selectOther(final RequestMessage requestMessage) throws OpProcessorException;
+
@Override
public ThrowingConsumer<Context> select(final Context ctx) throws OpProcessorException {
final RequestMessage message = ctx.getRequestMessage();
@@ -96,8 +102,10 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor {
final String msgInvalid = String.format("Message could not be parsed. Check the format of the request. [%s]", message);
throw new OpProcessorException(msgInvalid, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).result(msgInvalid).create());
default:
- final String msgDefault = String.format("Message with op code [%s] is not recognized.", message.getOp());
- throw new OpProcessorException(msgDefault, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).result(msgDefault).create());
+ op = selectOther(message).orElseThrow(() -> {
+ final String msgDefault = String.format("Message with op code [%s] is not recognized.", message.getOp());
+ return new OpProcessorException(msgDefault, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).result(msgDefault).create());
+ });
}
return op;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
index bdeb02a..8a31fe9 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
@@ -43,8 +43,9 @@ import java.util.concurrent.ConcurrentHashMap;
import static com.codahale.metrics.MetricRegistry.name;
/**
- * Simple {@link org.apache.tinkerpop.gremlin.server.OpProcessor} implementation that handles {@code ScriptEngine}
- * script evaluation in the context of a session.
+ * Simple {@link org.apache.tinkerpop.gremlin.server.OpProcessor} implementation that handles
+ * {@code ScriptEngine} script evaluation in the context of a session. Note that this processor will
+ * also take a "close" op to kill the session and rollback any incomplete transactions.
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
@@ -89,6 +90,34 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor {
return OP_PROCESSOR_NAME;
}
+ /**
+ * Session based requests accept a "close" operator in addition to "eval". A close will trigger the session to be
+ * killed and any uncommitted transaction to be rolled-back.
+ */
+ @Override
+ public Optional<ThrowingConsumer<Context>> selectOther(final RequestMessage requestMessage) throws OpProcessorException {
+ if (requestMessage.getOp().equals(Tokens.OPS_CLOSE)) {
+ // this must be an in-session request
+ if (!requestMessage.optionalArgs(Tokens.ARGS_SESSION).isPresent()) {
+ final String msg = String.format("A message with an [%s] op code requires a [%s] argument", Tokens.OPS_CLOSE, Tokens.ARGS_SESSION);
+ throw new OpProcessorException(msg, ResponseMessage.build(requestMessage).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).result(msg).create());
+ }
+
+ return Optional.of(ctx -> {
+ // validate the session is present and then remove it if it is.
+ final Session sessionToClose = sessions.get(requestMessage.getArgs().get(Tokens.ARGS_SESSION).toString());
+ if (null == sessionToClose) {
+ final String msg = String.format("There was no session named %s to close", requestMessage.getArgs().get(Tokens.ARGS_SESSION).toString());
+ throw new OpProcessorException(msg, ResponseMessage.build(requestMessage).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).result(msg).create());
+ }
+
+ sessionToClose.kill();
+ });
+ } else {
+ return Optional.empty();
+ }
+ }
+
@Override
public ThrowingConsumer<Context> getEvalOp() {
return this::evalOp;
@@ -99,7 +128,7 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor {
super.validateEvalMessage(message);
if (!message.optionalArgs(Tokens.ARGS_SESSION).isPresent()) {
- final String msg = String.format("A message with an [%s] op code requires a [%s] argument.", Tokens.OPS_EVAL, Tokens.ARGS_SESSION);
+ final String msg = String.format("A message with an [%s] op code requires a [%s] argument", Tokens.OPS_EVAL, Tokens.ARGS_SESSION);
throw new OpProcessorException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).result(msg).create());
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java
index 42cf649..017594f 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java
@@ -63,6 +63,11 @@ public class StandardOpProcessor extends AbstractEvalOpProcessor {
}
@Override
+ public Optional<ThrowingConsumer<Context>> selectOther(final RequestMessage requestMessage) throws OpProcessorException {
+ return Optional.empty();
+ }
+
+ @Override
public void close() throws Exception {
// do nothing = no resources to release
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5f31904a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 0057d6d..98c9d61 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import org.apache.tinkerpop.gremlin.driver.Result;
import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.driver.ser.JsonBuilderGryoSerializer;
@@ -507,6 +508,27 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
}
@Test
+ public void shouldCloseSession() throws Exception {
+ final Cluster cluster = Cluster.build().create();
+ final Client client = cluster.connect(name.getMethodName());
+
+ final ResultSet results1 = client.submit("x = [1,2,3,4,5,6,7,8,9]");
+ assertEquals(9, results1.all().get().size());
+ final ResultSet results2 = client.submit("x[0]+1");
+ assertEquals(2, results2.all().get().get(0).getInt());
+
+ client.close();
+
+ try {
+ client.submit("x[0]+1");
+ fail("Should have thrown an exception because the connection is closed");
+ } catch (Exception ex) {
+ final Throwable root = ExceptionUtils.getRootCause(ex);
+ assertThat(root, instanceOf(ConnectionException.class));
+ }
+ }
+
+ @Test
public void shouldExecuteScriptInSessionAssumingDefaultedImports() throws Exception {
final Cluster cluster = Cluster.build().create();
final Client client = cluster.connect(name.getMethodName());
@@ -682,5 +704,4 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
cluster.close();
}
-
}