You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2021/04/05 16:58:44 UTC

[tinkerpop] 02/07: TINKERPOP-2245 Refactored the session side of UnifiedChannelizer

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2245
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit a19c2867c4d312a64a9f8cdb4c182ab1304d09c2
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Wed Mar 31 14:33:31 2021 -0400

    TINKERPOP-2245 Refactored the session side of UnifiedChannelizer
    
    MultiRexster didn't have quite the same semantics as the SessionOpProcessor. Had to add in an option to configure the MultiRexster to work that way. Also sorted out some issues with timeouts with sessions which required a RexsterExecutorService so that a Thread processing a Rexster could be interrupted independently of the Future as calling cancel() on the Future is something that can only be done once and if you wish to preserve state between failures of requests in a session (i.e. t [...]
---
 docs/src/upgrade/release-3.5.x.asciidoc            |  6 ++
 .../apache/tinkerpop/gremlin/driver/Client.java    | 19 ++++++
 .../apache/tinkerpop/gremlin/driver/Tokens.java    |  7 ++
 .../apache/tinkerpop/gremlin/server/Context.java   |  4 ++
 .../gremlin/server/handler/AbstractRexster.java    | 77 ++++++++++++++++------
 .../gremlin/server/handler/MultiRexster.java       | 59 ++++++++++++-----
 .../tinkerpop/gremlin/server/handler/Rexster.java  |  5 ++
 .../gremlin/server/handler/UnifiedHandler.java     | 38 +++++++++--
 .../server/util/RexsterExecutorService.java        | 50 ++++++++++++++
 .../gremlin/server/util/RexsterFutureTask.java     | 44 +++++++++++++
 .../gremlin/server/util/ServerGremlinExecutor.java | 14 +++-
 .../AbstractGremlinServerIntegrationTest.java      |  5 ++
 .../gremlin/server/GremlinDriverIntegrateTest.java | 47 +++----------
 .../gremlin/server/GremlinServerIntegrateTest.java |  4 ++
 .../server/GremlinServerSessionIntegrateTest.java  | 61 +++++++++++++++--
 .../src/test/resources/log4j-test.properties       |  3 +
 16 files changed, 357 insertions(+), 86 deletions(-)

diff --git a/docs/src/upgrade/release-3.5.x.asciidoc b/docs/src/upgrade/release-3.5.x.asciidoc
index 3b99d58..0663891 100644
--- a/docs/src/upgrade/release-3.5.x.asciidoc
+++ b/docs/src/upgrade/release-3.5.x.asciidoc
@@ -420,6 +420,12 @@ Just some notes for later:
 will close, remaining requests in the queue will be ignored and rollback will occur.
 * care should be take with strict transaction management and multi-graph transactions (which aren't real - not a new thing)
 * absolute max lifetime of a session is a new thing
+* transaction semantic under unified
+** user manually calls commit() commits transaction
+** user manually calls rollback()
+** user manually calls close() on Cluster
+** user manually calls close() on Tx or GraphTraversalSource spawned from Transaction
+** server error
 
 ==== Gremlin Server Audit Logging
 
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index ac14c85..3ad22cf 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -688,6 +688,7 @@ public abstract class Client {
     public final static class SessionedClient extends Client {
         private final String sessionId;
         private final boolean manageTransactions;
+        private final boolean maintainStateAfterException;
 
         private ConnectionPool connectionPool;
 
@@ -697,6 +698,7 @@ public abstract class Client {
             super(cluster, settings);
             this.sessionId = settings.getSession().get().sessionId;
             this.manageTransactions = settings.getSession().get().manageTransactions;
+            this.maintainStateAfterException = settings.getSession().get().maintainStateAfterException;
         }
 
         /**
@@ -714,6 +716,7 @@ public abstract class Client {
             builder.processor("session");
             builder.addArg(Tokens.ARGS_SESSION, sessionId);
             builder.addArg(Tokens.ARGS_MANAGE_TRANSACTION, manageTransactions);
+            builder.addArg(Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION, maintainStateAfterException);
             return builder;
         }
 
@@ -838,11 +841,13 @@ public abstract class Client {
         private final boolean manageTransactions;
         private final String sessionId;
         private final boolean forceClosed;
+        private final boolean maintainStateAfterException;
 
         private SessionSettings(final Builder builder) {
             manageTransactions = builder.manageTransactions;
             sessionId = builder.sessionId;
             forceClosed = builder.forceClosed;
+            maintainStateAfterException = builder.maintainStateAfterException;
         }
 
         /**
@@ -867,6 +872,10 @@ public abstract class Client {
             return forceClosed;
         }
 
+        public boolean maintainStateAfterException() {
+            return maintainStateAfterException;
+        }
+
         public static SessionSettings.Builder build() {
             return new SessionSettings.Builder();
         }
@@ -875,11 +884,21 @@ public abstract class Client {
             private boolean manageTransactions = false;
             private String sessionId = UUID.randomUUID().toString();
             private boolean forceClosed = false;
+            private boolean maintainStateAfterException = false;
 
             private Builder() {
             }
 
             /**
+             * If enabled, errors related to individual request timeouts or errors during processing will no result
+             * in a close of the session itself.
+             */
+            public Builder maintainStateAfterException(final boolean maintainStateAfterException) {
+                this.maintainStateAfterException = maintainStateAfterException;
+                return this;
+            }
+
+            /**
              * If enabled, transactions will be "managed" such that each request will represent a complete transaction.
              * By default this value is {@code false}.
              */
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
index 5617127..ba61e46 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
@@ -78,6 +78,13 @@ public final class Tokens {
     public static final String ARGS_HOST = "host";
     public static final String ARGS_SESSION = "session";
     public static final String ARGS_MANAGE_TRANSACTION = "manageTransaction";
+
+    /**
+     * Argument name that is intended to be used with a session which when its value is {@code true} makes it so
+     * that a processing error or request timeout will not close the session, but leave it to continue processing in
+     * whatever state it may hold.
+     */
+    public static final String ARGS_MAINTAIN_STATE_AFTER_EXCEPTION = "maintainStateAfterException";
     public static final String ARGS_SASL = "sasl";
     public static final String ARGS_SASL_MECHANISM = "saslMechanism";
 
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
index fcd2072..b77c883 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
@@ -76,6 +76,10 @@ public class Context {
         return requestTimeout;
     }
 
+    public boolean isFinalResponseWritten() {
+        return this.finalResponseWritten.get();
+    }
+
     public RequestContentType getRequestContentType() {
         return requestContentType;
     }
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java
index 7f7c51c..e135b6c 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java
@@ -67,6 +67,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -84,36 +85,52 @@ public abstract class AbstractRexster implements Rexster, AutoCloseable {
     private static final Logger logger = LoggerFactory.getLogger(AbstractRexster.class);
     private static final Logger auditLogger = LoggerFactory.getLogger(GremlinServer.AUDIT_LOGGER_NAME);
 
+    private final boolean sessionIdOnRequest;
     private final Channel initialChannel;
     private final boolean transactionManaged;
     private final String sessionId;
     private final AtomicReference<ScheduledFuture<?>> sessionCancelFuture = new AtomicReference<>();
     private final AtomicReference<Future<?>> sessionFuture = new AtomicReference<>();
-    private long actualTimeoutLength = 0;
-    private boolean actualTimeoutCausedBySession = false;
+    private long actualTimeoutLengthWhenClosed = 0;
+    private Thread sessionThread;
+    protected final boolean maintainStateAfterException;
+    protected final AtomicReference<CloseReason> closeReason = new AtomicReference<>();
     protected final GraphManager graphManager;
     protected final ConcurrentMap<String, Rexster> sessions;
     protected final Set<String> aliasesUsedByRexster = new HashSet<>();
 
+    protected enum CloseReason { UNDETERMINED, CHANNEL_CLOSED, SESSION_TIMEOUT, REQUEST_TIMEOUT, NORMAL }
+
     AbstractRexster(final Context gremlinContext, final String sessionId,
-                    final boolean transactionManaged, final ConcurrentMap<String, Rexster> sessions) {
+                    final boolean transactionManaged,
+                    final ConcurrentMap<String, Rexster> sessions) {
+        // this only applies to sessions
+        this.maintainStateAfterException = (boolean) gremlinContext.getRequestMessage().
+                optionalArgs(Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION).orElse(false);
+        this.sessionIdOnRequest = gremlinContext.getRequestMessage().optionalArgs(Tokens.ARGS_SESSION).isPresent();
         this.transactionManaged = transactionManaged;
         this.sessionId = sessionId;
         this.initialChannel = gremlinContext.getChannelHandlerContext().channel();
 
         // close Rexster if the channel closes to cleanup and close transactions
         this.initialChannel.closeFuture().addListener(f -> {
-            // cancel session worker or it will keep waiting for items to appear in the session queue
-            final Future<?> sf = sessionFuture.get();
-            if (sf != null && !sf.isDone()) {
-                sf.cancel(true);
+            if (closeReason.compareAndSet(null, CloseReason.CHANNEL_CLOSED)) {
+                // cancel session worker or it will keep waiting for items to appear in the session queue
+                cancel(true);
+                close();
             }
-            close();
         });
         this.sessions = sessions;
         this.graphManager = gremlinContext.getGraphManager();
     }
 
+    protected synchronized void cancel(final boolean mayInterruptIfRunning) {
+        final FutureTask<?> sf = (FutureTask) sessionFuture.get();
+        if (sf != null && !sf.isDone()) {
+            sf.cancel(mayInterruptIfRunning);
+        }
+    }
+
     public boolean isTransactionManaged() {
         return transactionManaged;
     }
@@ -126,18 +143,22 @@ public abstract class AbstractRexster implements Rexster, AutoCloseable {
         return channel == initialChannel;
     }
 
-    public long getActualTimeoutLength() {
-        return actualTimeoutLength;
+    public long getActualTimeoutLengthWhenClosed() {
+        return actualTimeoutLengthWhenClosed;
     }
 
-    public boolean isActualTimeoutCausedBySession() {
-        return actualTimeoutCausedBySession;
+    public Optional<CloseReason> getCloseReason() {
+        return Optional.ofNullable(closeReason.get());
     }
 
     public GremlinScriptEngine getScriptEngine(final Context context, final String language) {
         return context.getGremlinExecutor().getScriptEngineManager().getEngineByName(language);
     }
 
+    public void setSessionThread(final Thread runner) {
+        this.sessionThread = runner;
+    }
+
     @Override
     public void setSessionCancelFuture(final ScheduledFuture<?> f) {
         if (!sessionCancelFuture.compareAndSet(null, f))
@@ -156,9 +177,16 @@ public abstract class AbstractRexster implements Rexster, AutoCloseable {
         // for final cleanup
         final Future<?> f = sessionFuture.get();
         if (f != null && !f.isDone()) {
-            actualTimeoutCausedBySession = causedBySession;
-            actualTimeoutLength = timeout;
-            sessionFuture.get().cancel(true);
+            if (closeReason.compareAndSet(null, causedBySession ? CloseReason.SESSION_TIMEOUT : CloseReason.REQUEST_TIMEOUT)) {
+                actualTimeoutLengthWhenClosed = timeout;
+
+                // if caused by a session timeout for a session OR if it is a request timeout for a sessionless
+                // request then we can just straight cancel() the Rexster instance
+                if (causedBySession || !sessionIdOnRequest)
+                    cancel(true);
+                else
+                    sessionThread.interrupt();
+            }
         }
     }
 
@@ -224,11 +252,22 @@ public abstract class AbstractRexster implements Rexster, AutoCloseable {
         if (root instanceof InterruptedException ||
                 root instanceof TraversalInterruptedException ||
                 root instanceof InterruptedIOException) {
-            final String msg = actualTimeoutCausedBySession ?
-                    String.format("Session closed - %s - sessionLifetimeTimeout of %s ms exceeded", sessionId, actualTimeoutLength) :
-                    String.format("Evaluation exceeded timeout threshold of %s ms", actualTimeoutLength);
+            String msg = "Processing interrupted but the reason why was not known";
+            switch (closeReason.get()) {
+                case CHANNEL_CLOSED:
+                    msg = "Processing interrupted because the channel was closed";
+                    break;
+                case SESSION_TIMEOUT:
+                    msg = String.format("Session closed - %s - sessionLifetimeTimeout of %s ms exceeded", sessionId, actualTimeoutLengthWhenClosed);
+                    break;
+                case REQUEST_TIMEOUT:
+                    msg = String.format("Evaluation exceeded timeout threshold of %s ms", actualTimeoutLengthWhenClosed);
+                    break;
+            }
+            final ResponseStatusCode code = closeReason.get() == CloseReason.SESSION_TIMEOUT || closeReason.get() == CloseReason.REQUEST_TIMEOUT ?
+                    ResponseStatusCode.SERVER_ERROR_TIMEOUT : ResponseStatusCode.SERVER_ERROR;
             throw new RexsterException(msg, root, ResponseMessage.build(gremlinContext.getRequestMessage())
-                    .code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+                    .code(code)
                     .statusMessage(msg).create());
         }
 
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java
index 539c16b..a54b905 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java
@@ -87,7 +87,6 @@ public class MultiRexster extends AbstractRexster {
 
     @Override
     public void addTask(final Context gremlinContext) {
-        // todo: explicitly reject request???
         if (acceptingTasks())
             queue.offer(gremlinContext);
     }
@@ -96,33 +95,48 @@ public class MultiRexster extends AbstractRexster {
     public void run() {
         // there must be one item in the queue at least since addTask() gets called before the worker
         // is ever started
-        Context gremlinContext = queue.poll();
-        if (null == gremlinContext)
+        Context currentGremlinContext = queue.poll();
+        if (null == currentGremlinContext)
             throw new IllegalStateException(String.format("Worker has no initial context for session: %s", getSessionId()));
 
         try {
-            startTransaction(gremlinContext);
+            startTransaction(currentGremlinContext);
             try {
                 while (true) {
                     // schedule timeout for the current request from the queue
-                    final long seto = gremlinContext.getRequestTimeout();
+                    final long seto = currentGremlinContext.getRequestTimeout();
                     requestCancelFuture = scheduledExecutorService.schedule(
                             () -> this.triggerTimeout(seto, false),
                             seto, TimeUnit.MILLISECONDS);
 
-                    process(gremlinContext);
+                    // only stop processing stuff in the queue if this Rexster isn't configured to hold state between
+                    // exceptions (i.e. the old OpProcessor way) or if this Rexster is closing down by certain death
+                    // (i.e. channel close or lifetime session timeout)
+                    try {
+                        process(currentGremlinContext);
+                    } catch (RexsterException ex) {
+                        if (!maintainStateAfterException || closeReason.get() == CloseReason.CHANNEL_CLOSED ||
+                            closeReason.get() == CloseReason.SESSION_TIMEOUT) {
+                            throw ex;
+                        }
+
+                        // reset the close reason as we are maintaining state
+                        closeReason.set(null);
+
+                        logger.warn(ex.getMessage(), ex);
+                        currentGremlinContext.writeAndFlush(ex.getResponseMessage());
+                    }
 
                     // work is done within the timeout period so cancel it
                     cancelRequestTimeout();
 
-                    gremlinContext = queue.take();
+                    currentGremlinContext = queue.take();
                 }
             } catch (Exception ex) {
-                // stop accepting requests on this worker since it is heading to close()
-                ending.set(true);
+                stopAcceptingRequests();
 
                 // the current context gets its exception handled...
-                handleException(gremlinContext, ex);
+                handleException(currentGremlinContext, ex);
             }
         } catch (RexsterException rexex) {
             // remaining work items in the queue are ignored since this worker is closing. must send
@@ -134,7 +148,7 @@ public class MultiRexster extends AbstractRexster {
                         .code(ResponseStatusCode.SERVER_ERROR)
                         .statusMessage(String.format(
                                 "An earlier request [%s] failed prior to this one having a chance to execute",
-                                gremlinContext.getRequestMessage().getRequestId())).create());
+                                currentGremlinContext.getRequestMessage().getRequestId())).create());
             }
 
             // exception should trigger a rollback in the session. a more focused rollback may have occurred
@@ -142,15 +156,25 @@ public class MultiRexster extends AbstractRexster {
             // the request
             closeTransactionSafely(Transaction.Status.ROLLBACK);
 
-            logger.warn(rexex.getMessage(), rexex);
-            gremlinContext.writeAndFlush(rexex.getResponseMessage());
+            // the current context could already be completed with SUCCESS and we're just waiting for another
+            // one to show up while a timeout occurs or the channel closes. in these cases, this would be a valid
+            // close in all likelihood so there's no reason to log or alert the client as the client already has
+            // the best answer
+            if (!currentGremlinContext.isFinalResponseWritten()) {
+                logger.warn(rexex.getMessage(), rexex);
+                currentGremlinContext.writeAndFlush(rexex.getResponseMessage());
+            }
         } finally {
-            close();
+            // if this is a normal end to the session or if the session life timeout is exceeded then the
+            // session needs to be removed and everything cleaned up
+            if (closeReason.compareAndSet(null, CloseReason.NORMAL) || closeReason.get() == CloseReason.SESSION_TIMEOUT) {
+                close();
+            }
         }
     }
 
     @Override
-    public synchronized void close() {
+    public void close() {
         ending.set(true);
         cancelRequestTimeout();
         super.close();
@@ -162,6 +186,11 @@ public class MultiRexster extends AbstractRexster {
             requestCancelFuture.cancel(true);
     }
 
+    private void stopAcceptingRequests() {
+        ending.set(true);
+        cancel(true);
+    }
+
     @Override
     protected Bindings getWorkerBindings() throws RexsterException {
         if (null == bindings)
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java
index 70869dd..41b39f6 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java
@@ -50,6 +50,11 @@ public interface Rexster extends Runnable {
     void setSessionFuture(final Future<?> f);
 
     /**
+     * Sets a reference to the all powerful thread that is running this Rexster.
+     */
+    void setSessionThread(final Thread t);
+
+    /**
      * Provides a general way to tell Rexster that it has exceeded some timeout condition.
      */
     void triggerTimeout(final long timeout, boolean causedBySession);
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java
index 7483062..5cad751 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java
@@ -149,19 +149,19 @@ public class UnifiedHandler extends SimpleChannelInboundHandler<RequestMessage>
             if (sessions.containsKey(sessionId)) {
                 final Rexster rexster = sessions.get(sessionId);
 
-                // check if the session is still accepting requests - if not block further requests
-                if (!rexster.acceptingTasks()) {
-                    final String sessionClosedMessage = String.format(
-                            "Session %s is no longer accepting requests as it has been closed", sessionId);
+                // check if the session is bound to this channel, thus one client per session
+                if (!rexster.isBoundTo(gremlinContext.getChannelHandlerContext().channel())) {
+                    final String sessionClosedMessage = String.format("Session %s is not bound to the connecting client", sessionId);
                     final ResponseMessage response = ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
                             .statusMessage(sessionClosedMessage).create();
                     ctx.writeAndFlush(response);
                     return;
                 }
 
-                // check if the session is bound to this channel, thus one client per session
-                if (!rexster.isBoundTo(gremlinContext.getChannelHandlerContext().channel())) {
-                    final String sessionClosedMessage = String.format("Session %s is not bound to the connecting client", sessionId);
+                // check if the session is still accepting requests - if not block further requests
+                if (!rexster.acceptingTasks()) {
+                    final String sessionClosedMessage = String.format(
+                            "Session %s is no longer accepting requests as it has been closed", sessionId);
                     final ResponseMessage response = ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
                             .statusMessage(sessionClosedMessage).create();
                     ctx.writeAndFlush(response);
@@ -201,6 +201,30 @@ public class UnifiedHandler extends SimpleChannelInboundHandler<RequestMessage>
             throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
         }
 
+        if (message.optionalArgs(Tokens.ARGS_SESSION).isPresent()) {
+            final Optional<Object> mtx = message.optionalArgs(Tokens.ARGS_MANAGE_TRANSACTION);
+            if (mtx.isPresent() && !(mtx.get() instanceof Boolean)) {
+                final String msg = String.format("%s argument must be of type boolean", Tokens.ARGS_MANAGE_TRANSACTION);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+
+            final Optional<Object> msae = message.optionalArgs(Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION);
+            if (msae.isPresent() && !(msae.get() instanceof Boolean)) {
+                final String msg = String.format("%s argument must be of type boolean", Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+        } else {
+            if (message.optionalArgs(Tokens.ARGS_MANAGE_TRANSACTION).isPresent()) {
+                final String msg = String.format("%s argument only applies to requests made for sessions", Tokens.ARGS_MANAGE_TRANSACTION);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+
+            if (message.optionalArgs(Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION).isPresent()) {
+                final String msg = String.format("%s argument only applies to requests made for sessions", Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+        }
+
         if (message.optionalArgs(Tokens.ARGS_BINDINGS).isPresent()) {
             final Map bindings = (Map) message.getArgs().get(Tokens.ARGS_BINDINGS);
             if (IteratorUtils.anyMatch(bindings.keySet().iterator(), k -> null == k || !(k instanceof String))) {
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/RexsterExecutorService.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/RexsterExecutorService.java
new file mode 100644
index 0000000..1ab738c
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/RexsterExecutorService.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.util;
+
+import org.apache.tinkerpop.gremlin.server.handler.Rexster;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A special {@code ThreadPoolExecutor} which will construct {@link RexsterFutureTask} instances and inject the
+ * current running thread into a {@link Rexster} instance if one is present.
+ */
+public class RexsterExecutorService extends ThreadPoolExecutor {
+
+    public RexsterExecutorService(final int nThreads, final ThreadFactory threadFactory) {
+        super(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory);
+    }
+
+    @Override
+    protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
+        return new RexsterFutureTask<>(runnable, value);
+    }
+
+
+    @Override
+    protected void beforeExecute(final Thread t, final Runnable r) {
+        if (r instanceof RexsterFutureTask)
+            ((RexsterFutureTask<?>) r).getRexster().ifPresent(rex -> rex.setSessionThread(t));
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/RexsterFutureTask.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/RexsterFutureTask.java
new file mode 100644
index 0000000..6aebfdb
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/RexsterFutureTask.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.util;
+
+import org.apache.tinkerpop.gremlin.server.handler.Rexster;
+
+import java.util.Optional;
+import java.util.concurrent.FutureTask;
+
+/**
+ * A cancellable asynchronous operation with the added ability to get a {@code Rexster} instance if the
+ * {@code Runnable} for the task was of that type.
+ */
+public class RexsterFutureTask<V> extends FutureTask<V> {
+
+    private final Rexster rexster;
+
+    public RexsterFutureTask(final Runnable runnable, final  V result) {
+        super(runnable, result);
+
+        // hold an instance to the Rexster instance if it is of that type
+        this.rexster = runnable instanceof Rexster ? (Rexster) runnable : null;
+    }
+
+    public Optional<Rexster> getRexster() {
+        return Optional.ofNullable(this.rexster);
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
index 67a1608..0102956 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
@@ -25,6 +25,7 @@ import org.apache.tinkerpop.gremlin.server.Channelizer;
 import org.apache.tinkerpop.gremlin.server.GraphManager;
 import org.apache.tinkerpop.gremlin.server.GremlinServer;
 import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,6 +70,8 @@ public class ServerGremlinExecutor {
      * {@code scheduleExecutorServiceClass} is set to {@code null} it will be created via
      * {@link Executors#newScheduledThreadPool(int, ThreadFactory)}.  If either of the {@link ExecutorService}
      * instances are supplied, the {@link Settings#gremlinPool} value will be ignored for the pool size.
+     *
+     * @param gremlinExecutorService Expects a RexsterExecutorService if using the {@link UnifiedChannelizer}.
      */
     public ServerGremlinExecutor(final Settings settings, final ExecutorService gremlinExecutorService,
                                  final ScheduledExecutorService scheduledExecutorService) {
@@ -93,8 +96,17 @@ public class ServerGremlinExecutor {
 
         if (null == gremlinExecutorService) {
             final ThreadFactory threadFactoryGremlin = ThreadFactoryUtil.create("exec-%d");
-            this.gremlinExecutorService = Executors.newFixedThreadPool(settings.gremlinPool, threadFactoryGremlin);
+
+            // RexsterExecutorService adds some important bits that are helpful to the UnifiedChannelizer, but
+            // using it generally should really have no ill effect to the old OpProcessor stuff or GremlinExecutor
+            // in general.
+            this.gremlinExecutorService = new RexsterExecutorService(settings.gremlinPool, threadFactoryGremlin);
         } else {
+            if (settings.channelizer.equals(UnifiedChannelizer.class.getName())) {
+                logger.error("The {} requires use of a {} for the GremlinExecutor but a {} was provided instead",
+                        settings.channelizer, RexsterExecutorService.class.getName(), gremlinExecutorService.getClass().getName());
+            }
+
             this.gremlinExecutorService = gremlinExecutorService;
         }
 
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
index deb1734..c443d6d 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
@@ -161,6 +161,11 @@ public abstract class AbstractGremlinServerIntegrationTest {
         OpLoader.reset();
     }
 
+    protected boolean isUsingUnifiedChannelizer() {
+        return server.getServerGremlinExecutor().
+                getSettings().channelizer.equals(UnifiedChannelizer.class.getName());
+    }
+
     public static boolean deleteDirectory(final File directory) {
         if (directory.exists()) {
             final File[] files = directory.listFiles();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index c11cad8..ce93582 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -42,7 +42,6 @@ import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin;
 import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
 import org.apache.tinkerpop.gremlin.server.handler.OpExecutorHandler;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
@@ -100,7 +99,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.core.StringStartsWith.startsWith;
-import static org.junit.Assume.assumeThat;
 import static org.mockito.Mockito.verify;
 
 /**
@@ -1615,7 +1613,10 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.open();
 
         try {
-            final Client client = cluster.connect(name.getMethodName());
+            // this configures the client to behave like OpProcessor for UnifiedChannelizer
+            final Client.SessionSettings settings = Client.SessionSettings.build().
+                    sessionId(name.getMethodName()).maintainStateAfterException(true).create();
+            final Client client = cluster.connect(Client.Settings.build().useSession(settings).create());
 
             for (int index = 0; index < 50; index++) {
                 final CompletableFuture<ResultSet> first = client.submitAsync(
@@ -1647,48 +1648,16 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
                 final CompletableFuture<List<Result>> futureThird = third.get().all();
                 final CompletableFuture<List<Result>> futureFourth = fourth.get().all();
 
-                // there is slightly different assertion logic with UnifiedChannelizer given differences in session
-                // behavior where UnfiedChannelizer sessions won't continue processing in the face of a timeout and
-                // a new session will need to be created
-                if (server.getServerGremlinExecutor().getSettings().channelizer.equals(UnifiedChannelizer.class.getName())) {
-                    // first timesout and the rest get SERVER_ERROR
-                    try {
-                        futureFirst.get();
-                        fail("Should have timed out");
-                    } catch (Exception ex) {
-                        final Throwable root = ExceptionUtils.getRootCause(ex);
-                        assertThat(root, instanceOf(ResponseException.class));
-                        assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) root).getResponseStatusCode());
-                        assertThat(root.getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("250 ms")));
-                    }
-
-                    assertFutureTimeoutUnderUnified(futureSecond);
-                    assertFutureTimeoutUnderUnified(futureThird);
-                    assertFutureTimeoutUnderUnified(futureFourth);
-                } else {
-                    assertFutureTimeout(futureFirst);
-                    assertFutureTimeout(futureSecond);
-                    assertFutureTimeout(futureThird);
-                    assertFutureTimeout(futureFourth);
-                }
+                assertFutureTimeout(futureFirst);
+                assertFutureTimeout(futureSecond);
+                assertFutureTimeout(futureThird);
+                assertFutureTimeout(futureFourth);
             }
         } finally {
             cluster.close();
         }
     }
 
-    private void assertFutureTimeoutUnderUnified(final CompletableFuture<List<Result>> f) {
-        try {
-            f.get();
-            fail("Should have timed out");
-        } catch (Exception ex) {
-            final Throwable root = ExceptionUtils.getRootCause(ex);
-            assertThat(root, instanceOf(ResponseException.class));
-            assertEquals(ResponseStatusCode.SERVER_ERROR, ((ResponseException) root).getResponseStatusCode());
-            assertThat(root.getMessage(), allOf(startsWith("An earlier request"), endsWith("failed prior to this one having a chance to execute")));
-        }
-    }
-
     private void assertFutureTimeout(final CompletableFuture<List<Result>> f) {
         try {
             f.get();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
index 270bed2..a170185 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
@@ -88,6 +88,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
 
 /**
  * Integration tests for server-side settings and processing.
@@ -365,6 +366,9 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
 
     @Test
     public void shouldProduceProperExceptionOnTimeout() throws Exception {
+        // this test will not work quite right on UnifiedChannelizer
+        assumeThat("Must use OpProcessor", isUsingUnifiedChannelizer(), is(false));
+
         final Cluster cluster = TestClientFactory.open();
         final Client client = cluster.connect(name.getMethodName());
 
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
index fe9ff99..6d07558 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
@@ -23,6 +23,7 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.RequestOptions;
 import org.apache.tinkerpop.gremlin.driver.Result;
 import org.apache.tinkerpop.gremlin.driver.ResultSet;
 import org.apache.tinkerpop.gremlin.driver.Tokens;
@@ -54,6 +55,7 @@ import static org.hamcrest.core.IsIterableContaining.hasItem;
 import static org.hamcrest.core.StringStartsWith.startsWith;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
 
 /**
  * @author Stephen Mallette (http://stephen.genoprime.com)
@@ -107,6 +109,7 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
                 settings.sessionLifetimeTimeout = 3000L;
                 break;
             case "shouldCloseSessionOnClientClose":
+            case "shouldCloseSessionOnClientCloseWithStateMaintainedBetweenExceptions":
                 clearNeo4j(settings);
                 break;
             case "shouldEnsureSessionBindingsAreThreadSafe":
@@ -144,11 +147,6 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
         return settings;
     }
 
-    private boolean isUsingUnifiedChannelizer() {
-        return server.getServerGremlinExecutor().
-                getSettings().channelizer.equals(UnifiedChannelizer.class.getName());
-    }
-
     private static void clearNeo4j(Settings settings) {
         deleteDirectory(new File("/tmp/neo4j"));
         settings.graphs.put("graph", "conf/neo4j-empty.properties");
@@ -179,6 +177,7 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
 
         // should get an error because "x" is not defined as this is a new session
         try {
+            clientReconnect.submit("y=100").all().join();
             clientReconnect.submit("x").all().join();
             fail("Should not have been successful as 'x' was only defined in the old session");
         } catch(Exception ex) {
@@ -188,6 +187,12 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
 
         // the commit from client1 should not have gone through so there should be no data present.
         assertEquals(0, clientReconnect.submit("graph.traversal().V().count()").all().join().get(0).getInt());
+
+        // must turn on maintainStateAfterException for unified channelizer
+        if (!isUsingUnifiedChannelizer()) {
+            assertEquals(100, clientReconnect.submit("y").all().join().get(0).getInt());
+        }
+
         clusterReconnect.close();
 
         if (isUsingUnifiedChannelizer()) {
@@ -196,6 +201,49 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
     }
 
     @Test
+    public void shouldCloseSessionOnClientCloseWithStateMaintainedBetweenExceptions() throws Exception {
+        assumeNeo4jIsPresent();
+        assumeThat("Must use UnifiedChannelizer", isUsingUnifiedChannelizer(), is(true));
+
+        final Cluster cluster1 = TestClientFactory.open();
+        final Client client1 = cluster1.connect(name.getMethodName());
+        client1.submit("x = 1").all().join();
+        client1.submit("graph.addVertex()").all().join();
+        client1.close();
+        cluster1.close();
+
+        assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
+
+        // try to reconnect to that session and make sure no state is there
+        final Cluster clusterReconnect = TestClientFactory.open();
+
+        // this configures the client to behave like OpProcessor for UnifiedChannelizer
+        final Client.SessionSettings settings = Client.SessionSettings.build().
+                sessionId(name.getMethodName()).maintainStateAfterException(true).create();
+        final Client clientReconnect = clusterReconnect.connect(Client.Settings.build().useSession(settings).create());
+
+        // should get an error because "x" is not defined as this is a new session
+        try {
+            clientReconnect.submit("y=100").all().join();
+            clientReconnect.submit("x").all().join();
+            fail("Should not have been successful as 'x' was only defined in the old session");
+        } catch(Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root.getMessage(), startsWith("No such property"));
+        }
+
+        // the commit from client1 should not have gone through so there should be no data present.
+        assertEquals(0, clientReconnect.submit("graph.traversal().V().count()").all().join().get(0).getInt());
+
+        // since maintainStateAfterException is enabled the UnifiedChannelizer works like OpProcessor
+        assertEquals(100, clientReconnect.submit("y").all().join().get(0).getInt());
+
+        clusterReconnect.close();
+
+        assertEquals(0, ((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().getActiveSessionCount());
+    }
+
+    @Test
     public void shouldUseGlobalFunctionCache() throws Exception {
         final Cluster cluster = TestClientFactory.open();
         final Client session = cluster.connect(name.getMethodName());
@@ -280,8 +328,11 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
             client.submit("graph.addVertex(); graph.tx().commit()").all().get();
         }
 
+
         // the transaction is managed so a rollback should have executed
         assertEquals(1, client.submit("g.V().count()").all().get().get(0).getInt());
+
+        cluster.close();
     }
 
     @Test
diff --git a/gremlin-server/src/test/resources/log4j-test.properties b/gremlin-server/src/test/resources/log4j-test.properties
index a9aeddd..b50636b 100644
--- a/gremlin-server/src/test/resources/log4j-test.properties
+++ b/gremlin-server/src/test/resources/log4j-test.properties
@@ -20,5 +20,8 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%p] %C - %m%n
 
+log4j.logger.org.apache.tinkerpop.gremlin.server.AbstractChannelizer=ERROR
 log4j.logger.org.apache.tinkerpop.gremlin.server.op.AbstractEvalOpProcessor=ERROR
+log4j.logger.org.apache.tinkerpop.gremlin.server.handler.MultiRexster=ERROR
+log4j.logger.org.apache.tinkerpop.gremlin.server.handler.SingleRexster=ERROR
 log4j.logger.audit.org.apache.tinkerpop.gremlin.server=INFO