You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2021/04/05 16:58:42 UTC

[tinkerpop] branch TINKERPOP-2245 updated (dd55326 -> 26f45af)

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a change to branch TINKERPOP-2245
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git.


 discard dd55326  TINKERPOP-2245 gremlinPool needs to be defaulted to a larger value
 discard 371d6da  TINKERPOP-2245 Changed logging for sessions
 discard 7b92c4c  Closing the alias needs to close the underlying client with it
 discard 5664b93  Ignored shouldEventuallySucceedOnSameServerWithDefault
 discard db47873  TINKERPOP-2245 Refactored the session side of UnifiedChannelizer
 discard 080d165  TINKERPOP-2245 Added UnifiedChannelizer
     add 3f9cc71  Minor documentation fix - bad file name. CTR
     add 1f7e65d  Merge branch '3.4-dev'
     add 289b980  TINKERPOP-2544: Modify site publishing scripts to include gremlint
     add 5c75978  Merge pull request #1412 from apache/TINKERPOP-2544
     new 0fa8f84  TINKERPOP-2245 Added UnifiedChannelizer
     new a19c286  TINKERPOP-2245 Refactored the session side of UnifiedChannelizer
     new c05c291  Ignored shouldEventuallySucceedOnSameServerWithDefault
     new 7913b14  Closing the alias needs to close the underlying client with it
     new 38aa0d2  TINKERPOP-2245 Changed logging for sessions
     new 6b32efd  TINKERPOP-2245 gremlinPool needs to be defaulted to a larger value
     new 26f45af  TINKERPOP-2245 Documentation updates around UnifiedChannelizer

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (dd55326)
            \
             N -- N -- N   refs/heads/TINKERPOP-2245 (26f45af)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 bin/generate-home.sh                               |  9 ++++
 docs/gremlint/package.json                         |  2 +-
 .../dev/developer/development-environment.asciidoc |  6 +++
 docs/src/dev/developer/for-committers.asciidoc     |  9 +++-
 docs/src/reference/gremlin-applications.asciidoc   | 40 +++++++++++++----
 docs/src/reference/implementations-neo4j.asciidoc  |  2 +-
 docs/src/upgrade/release-3.5.x.asciidoc            | 51 ++++++++++++++--------
 7 files changed, 90 insertions(+), 29 deletions(-)

[tinkerpop] 06/07: TINKERPOP-2245 gremlinPool needs to be defaulted to a larger value

Posted by sp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2245
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 6b32efd77c4c308d467ec4dbb12299a943adccd8
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Fri Apr 2 06:07:34 2021 -0400

    TINKERPOP-2245 gremlinPool needs to be defaulted to a larger value
    
    For environments like travis which might only have two cores there is a chance for tests to fail/not finish as they introduce blocks in requests that need to exist for tests to pass. If there aren't enough threads in the pool the requests can't be serviced properly and they fail.
---
 .travis.yml                                                             | 2 +-
 docker/gremlin-server/gremlin-server-integration-krb5.yaml              | 1 +
 docker/gremlin-server/gremlin-server-integration-secure.yaml            | 1 +
 docker/gremlin-server/gremlin-server-integration.yaml                   | 1 +
 .../org/apache/tinkerpop/gremlin/server/gremlin-server-integration.yaml | 2 ++
 5 files changed, 6 insertions(+), 1 deletion(-)

diff --git a/.travis.yml b/.travis.yml
index ff3a058..b8b02a7 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -60,7 +60,7 @@ jobs:
       name: "gremlin server"
     - script:
         - "mvn clean install -q -DskipTests -Dci"
-        - "travis_wait 60 mvn verify -pl :gremlin-server -DskipTests -DskipIntegrationTests=false -DincludeNeo4j -DtestUnified=true"
+        - "mvn verify -pl :gremlin-server -DskipTests -DskipIntegrationTests=false -DincludeNeo4j -DtestUnified=true"
       name: "gremlin server - unified"
     - script:
         - "mvn clean install -q -DskipTests -Dci"
diff --git a/docker/gremlin-server/gremlin-server-integration-krb5.yaml b/docker/gremlin-server/gremlin-server-integration-krb5.yaml
index 16bad46..cb7e737 100644
--- a/docker/gremlin-server/gremlin-server-integration-krb5.yaml
+++ b/docker/gremlin-server/gremlin-server-integration-krb5.yaml
@@ -48,6 +48,7 @@ processors:
   - { className: org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor, config: {}}
 metrics: {
   slf4jReporter: {enabled: true, interval: 180000}}
+gremlinPool: 8
 strictTransactionManagement: false
 idleConnectionTimeout: 0
 keepAliveInterval: 0
diff --git a/docker/gremlin-server/gremlin-server-integration-secure.yaml b/docker/gremlin-server/gremlin-server-integration-secure.yaml
index 1cd3156..e6a999f 100644
--- a/docker/gremlin-server/gremlin-server-integration-secure.yaml
+++ b/docker/gremlin-server/gremlin-server-integration-secure.yaml
@@ -48,6 +48,7 @@ processors:
   - { className: org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor, config: {}}
 metrics: {
   slf4jReporter: {enabled: true, interval: 180000}}
+gremlinPool: 8
 strictTransactionManagement: false
 idleConnectionTimeout: 0
 keepAliveInterval: 0
diff --git a/docker/gremlin-server/gremlin-server-integration.yaml b/docker/gremlin-server/gremlin-server-integration.yaml
index 13c2d6c..7f56673 100644
--- a/docker/gremlin-server/gremlin-server-integration.yaml
+++ b/docker/gremlin-server/gremlin-server-integration.yaml
@@ -48,6 +48,7 @@ processors:
   - { className: org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor, config: {}}
 metrics: {
   slf4jReporter: {enabled: true, interval: 180000}}
+gremlinPool: 8
 strictTransactionManagement: false
 idleConnectionTimeout: 0
 keepAliveInterval: 0
diff --git a/gremlin-server/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-server-integration.yaml b/gremlin-server/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-server-integration.yaml
index 601e404..35a19d7 100644
--- a/gremlin-server/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-server-integration.yaml
+++ b/gremlin-server/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-server-integration.yaml
@@ -21,6 +21,7 @@
 # Changes to this file need to be appropriately replicated to
 #
 # - docker/gremlin-server/gremlin-server-integration.yaml
+# - docker/gremlin-server/gremlin-server-integration-krb5.yaml
 # - docker/gremlin-server/gremlin-server-integration-secure.yaml
 #
 # Without such changes, the test docker server can't be started for independent
@@ -61,6 +62,7 @@ processors:
   - { className: org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor, config: {}}
 metrics: {
   slf4jReporter: {enabled: true, interval: 180000}}
+gremlinPool: 8
 strictTransactionManagement: false
 idleConnectionTimeout: 0
 keepAliveInterval: 0

[tinkerpop] 04/07: Closing the alias needs to close the underlying client with it

Posted by sp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2245
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 7913b148121b5686e4bdd8809d8360ce4400a732
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Thu Apr 1 11:00:44 2021 -0400

    Closing the alias needs to close the underlying client with it
    
    This is a bit of a change in behavior but it's a necessary one. Perhaps aliasing should be refactored at some point to not need AliasClusteredClient as a proxy. The Client itself should probably just be configured with the required alias. CTR
---
 CHANGELOG.asciidoc                                 |  3 +-
 .../apache/tinkerpop/gremlin/driver/Client.java    | 16 +++++-----
 .../gremlin/server/GremlinDriverIntegrateTest.java | 10 +++----
 .../server/GremlinSessionTxIntegrateTest.java      | 35 +++++++++++++++++++++-
 4 files changed, 49 insertions(+), 15 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 041c278..37da741 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -23,7 +23,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 [[release-3-5-0]]
 === TinkerPop 3.5.0 (Release Date: NOT OFFICIALLY RELEASED YET)
 
-This release also includes changes from <<release-3-4-3, 3.4.3>>.
+This release also includes changes from <<release-3-4-11, 3.4.11>>.
 
 * Added `gremlin-language` module.
 * Allowed the possibility for the propagation of `null` as a `Traverser` in Gremlin.
@@ -41,6 +41,7 @@ This release also includes changes from <<release-3-4-3, 3.4.3>>.
 * Allowed additional arguments to `Client.submit()` in Javascript driver to enable setting of parameters like `scriptEvaluationTimeout`.
 * Gremlin.Net driver no longer supports skipping deserialization by default. Users can however create their own `IMessageSerializer` if they need this functionality.
 * Supported deserialization of `dict` and `list` as a key in a `dict` for Python.
+* Changed the aliased `Client` to proxy `close()` methods to its underlying client.
 * Added support for remote `g.tx()` usage.
 * Added support for bytecode-based sessions.
 * Added a `Graph.Feature` for `supportsNullPropertyValues`.
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 3ad22cf..4a38951 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -360,7 +360,7 @@ public abstract class Client {
      * A low-level method that allows the submission of a manually constructed {@link RequestMessage}.
      */
     public CompletableFuture<ResultSet> submitAsync(final RequestMessage msg) {
-        if (isClosing()) throw new IllegalStateException("Client has been closed");
+        if (isClosing()) throw new IllegalStateException("Client is closed");
 
         if (!initialized)
             init();
@@ -655,19 +655,19 @@ public abstract class Client {
             return client.chooseConnection(msg);
         }
 
-        /**
-         * Prevents messages from being sent from this {@code Client}. Note that calling this method does not call
-         * close on the {@code Client} that created it.
-         */
+        @Override
+        public void close() {
+            client.close();
+        }
+
         @Override
         public synchronized CompletableFuture<Void> closeAsync() {
-            close.complete(null);
-            return close;
+            return client.closeAsync();
         }
 
         @Override
         public boolean isClosing() {
-            return close.isDone();
+            return client.isClosing();
         }
 
         /**
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index f006cc4..7e09c95 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -1702,7 +1702,7 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         } catch (Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
             assertThat(root, instanceOf(IllegalStateException.class));
-            assertEquals("Client has been closed", root.getMessage());
+            assertEquals("Client is closed", root.getMessage());
         }
 
         try {
@@ -1711,7 +1711,7 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         } catch (Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
             assertThat(root, instanceOf(IllegalStateException.class));
-            assertEquals("Client has been closed", root.getMessage());
+            assertEquals("Client is closed", root.getMessage());
         }
 
         try {
@@ -1720,7 +1720,7 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         } catch (Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
             assertThat(root, instanceOf(IllegalStateException.class));
-            assertEquals("Client has been closed", root.getMessage());
+            assertEquals("Client is closed", root.getMessage());
         }
 
         try {
@@ -1729,7 +1729,7 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         } catch (Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
             assertThat(root, instanceOf(IllegalStateException.class));
-            assertEquals("Client has been closed", root.getMessage());
+            assertEquals("Client is closed", root.getMessage());
         }
 
         try {
@@ -1738,7 +1738,7 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         } catch (Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
             assertThat(root, instanceOf(IllegalStateException.class));
-            assertEquals("Client has been closed", root.getMessage());
+            assertEquals("Client is closed", root.getMessage());
         }
 
         // allow call to close() even though closed through cluster
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionTxIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionTxIntegrateTest.java
index ae8817f..534f867 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionTxIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionTxIntegrateTest.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.driver.Cluster;
 import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.junit.Test;
 
 import java.io.File;
@@ -188,7 +189,7 @@ public class GremlinSessionTxIntegrateTest extends AbstractGremlinServerIntegrat
 
         gtx.addV("person").iterate();
         assertEquals(1, (long) gtx.V().count().next());
-        gtx.tx().close();
+        gtx.close();
         assertThat(gtx.tx().isOpen(), is(false));
 
         // sessionless connections should still be good - close() should not affect that
@@ -207,6 +208,38 @@ public class GremlinSessionTxIntegrateTest extends AbstractGremlinServerIntegrat
     }
 
     @Test
+    public void shouldOpenAndCloseObsceneAmountOfSessions() throws Exception {
+        assumeNeo4jIsPresent();
+
+        final Cluster cluster = TestClientFactory.build().create();
+        final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+        // need to open significantly more sessions that we have threads in gremlinPool
+        final int numberOfSessions = 500;
+        for (int ix = 0; ix < numberOfSessions; ix ++) {
+            final Transaction tx = g.tx();
+            final GraphTraversalSource gtx = tx.begin();
+            try {
+                final Vertex v1 = gtx.addV("person").property("pid", ix + "a").next();
+                final Vertex v2 = gtx.addV("person").property("pid", ix + "b").next();
+                gtx.addE("knows").from(v1).to(v2).iterate();
+                tx.commit();
+            } catch (Exception ex) {
+                tx.rollback();
+                fail("Should not expect any failures");
+            } finally {
+                assertThat(tx.isOpen(), is(false));
+            }
+        }
+
+        // sessionless connections should still be good - close() should not affect that
+        assertEquals(numberOfSessions * 2, (long) g.V().count().next());
+        assertEquals(numberOfSessions, (long) g.E().count().next());
+
+        cluster.close();
+    }
+
+    @Test
     public void shouldCommitTxBytecodeInSessionReusingGtxAcrossThreads() throws Exception {
         assumeNeo4jIsPresent();
 

[tinkerpop] 07/07: TINKERPOP-2245 Documentation updates around UnifiedChannelizer

Posted by sp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2245
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 26f45af09c6fef5885180b5b25893fa7689f8f1a
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Mon Apr 5 12:58:05 2021 -0400

    TINKERPOP-2245 Documentation updates around UnifiedChannelizer
---
 docs/src/reference/gremlin-applications.asciidoc | 40 +++++++++++++++----
 docs/src/upgrade/release-3.5.x.asciidoc          | 51 ++++++++++++++++--------
 2 files changed, 66 insertions(+), 25 deletions(-)

diff --git a/docs/src/reference/gremlin-applications.asciidoc b/docs/src/reference/gremlin-applications.asciidoc
index bac1ba3..12b2770 100644
--- a/docs/src/reference/gremlin-applications.asciidoc
+++ b/docs/src/reference/gremlin-applications.asciidoc
@@ -832,6 +832,10 @@ channelizer: org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer
 [source,yaml]
 channelizer: org.apache.tinkerpop.gremlin.server.channel.WsAndHttpChannelizer
 
+NOTE: The `UnifiedChannelizer` introduced in 3.5.0 can also be used to support HTTP requests as its functionality
+is similar to `WsAndHttpChannelizer`. Please see the Gremlin Server UnifiedChannelizer Section of the Upgrade
+Documentation for 3.5.0 for more link:https://tinkerpop.apache.org/docs/current/upgrade/#_tinkerpop_3_5_0[details].
+
 The `HttpChannelizer` is already configured in the `gremlin-server-rest-modern.yaml` file that is packaged with the Gremlin
 Server distribution.  To utilize it, start Gremlin Server as follows:
 
@@ -966,7 +970,7 @@ The following table describes the various YAML configuration options that Gremli
 |authorization.config |A `Map` of configuration settings to be passed to the `Authorizer` when it is constructed.  The settings available are dependent on the implementation. |_none_
 |channelizer |The fully qualified classname of the `Channelizer` implementation to use.  A `Channelizer` is a "channel initializer" which Gremlin Server uses to define the type of processing pipeline to use.  By allowing different `Channelizer` implementations, Gremlin Server can support different communication protocols (e.g. WebSocket). |`WebSocketChannelizer`
 |enableAuditLog |The `AuthenticationHandler`, `AuthorizationHandler` and processors can issue audit logging messages with the authenticated user, remote socket address and requests with a gremlin query. For privacy reasons, the default value of this setting is false. The audit logging messages are logged at the INFO level via the `audit.org.apache.tinkerpop.gremlin.server` logger, which can be configured using the log4j.properties file. |_false_
-|graphManager |The fully qualified classname of the `GraphManager` implementation to use.  A `GraphManager` is a class that adheres to the TinkerPop `GraphManager` interface, allowing custom implementations for storing and managing graph references, as well as defining custom methods to open and close graphs instantiations. It is important to note that the TinkerPop HTTP and WebSocketChannelizers auto-commit and auto-rollback based on the graphs stored in the graphManager upon script exe [...]
+|graphManager |The fully qualified classname of the `GraphManager` implementation to use.  A `GraphManager` is a class that adheres to the TinkerPop `GraphManager` interface, allowing custom implementations for storing and managing graph references, as well as defining custom methods to open and close graphs instantiations. To prevent Gremlin Server from starting when all graphs fails, the `CheckedGraphManager` can be used.|`DefaultGraphManager`
 |graphs |A `Map` of `Graph` configuration files where the key of the `Map` becomes the name to which the `Graph` will be bound and the value is the file name of a `Graph` configuration file. |_none_
 |gremlinPool |The number of "Gremlin" threads available to execute actual scripts in a `ScriptEngine`. This pool represents the workers available to handle blocking operations in Gremlin Server. When set to `0`, Gremlin Server will use the value provided by `Runtime.availableProcessors()`. |0
 |host |The name of the host to bind the server to. |localhost
@@ -977,6 +981,7 @@ The following table describes the various YAML configuration options that Gremli
 |maxContentLength |The maximum length of the aggregated content for a message.  Works in concert with `maxChunkSize` where chunked requests are accumulated back into a single message.  A request exceeding this size will return a `413 - Request Entity Too Large` status code.  A response exceeding this size will raise an internal exception. |65536
 |maxHeaderSize |The maximum length of all headers. |8192
 |maxInitialLineLength |The maximum length of the initial line (e.g.  "GET / HTTP/1.0") processed in a request, which essentially controls the maximum length of the submitted URI. |4096
+|maxParameters |The maximum number of parameters that can be passed on a request. Larger numbers may impact performance for scripts. This configuration only applies to the `UnifiedChannelizer`. |16
 |metrics.consoleReporter.enabled |Turns on console reporting of metrics. |false
 |metrics.consoleReporter.interval |Time in milliseconds between reports of metrics to console. |180000
 |metrics.csvReporter.enabled |Turns on CSV reporting of metrics. |false
@@ -1009,6 +1014,7 @@ The following table describes the various YAML configuration options that Gremli
 |serializers |A `List` of `Map` settings, where each `Map` represents a `MessageSerializer` implementation to use along with its configuration. If this value is not set, then Gremlin Server will configure with GraphSON and GraphBinary but will not register any `ioRegistries` for configured graphs. |_empty_
 |serializers[X].className |The full class name of the `MessageSerializer` implementation. |_none_
 |serializers[X].config |A `Map` containing `MessageSerializer` specific configurations. |_none_
+|sessionLifetimeTimeout |The maximum time in milliseconds that a session can exist. This value cannot be extended beyond this value irrespective of the number of requests and their individual timeouts. The session life cannot be extended once started. This configuration only applies to the `UnifiedChannelizer`. |600000 (10 minutes)
 |ssl.enabled |Determines if SSL is turned on or not. |false
 |ssl.keyStore |The private key in JKS or PKCS#12 format.  |_none_
 |ssl.keyStorePassword |The password of the `keyStore` if it is password-protected. |_none_
@@ -1021,7 +1027,9 @@ The following table describes the various YAML configuration options that Gremli
 |strictTransactionManagement |Set to `true` to require `aliases` to be submitted on every requests, where the `aliases` become the scope of transaction management. |false
 |threadPoolBoss |The number of threads available to Gremlin Server for accepting connections. Should always be set to `1`. |1
 |threadPoolWorker |The number of threads available to Gremlin Server for processing non-blocking reads and writes. |1
-|useEpollEventLoop |try to use epoll event loops (works only on Linux os) instead of netty NIO. |false
+|useCommonEngineForSessions |Ensures that the same `ScriptEngine` is used to support sessions and sessionless requests which will lead to better performance. Do not change this setting from the default without a specific use case in mind. This configuration only applies to the `UnifiedChannelizer`. |true
+|useEpollEventLoop |Try to use epoll event loops (works only on Linux os) instead of netty NIO. |false
+|useGlobalFunctionCacheForSessions |Enable the global function cache for sessions when using the `UnifiedChannelizer`. When `true` it means that functions created in one request to a session remain available on the next request to that session. This setting is only relevant when `useGlobalFunctionCacheForSessions` is `false`. |true
 |writeBufferHighWaterMark | If the number of bytes in the network send buffer exceeds this value then the channel is no longer writeable, accepting no additional writes until buffer is drained and the `writeBufferLowWaterMark` is met. |65536
 |writeBufferLowWaterMark | Once the number of bytes queued in the network send buffer exceeds the `writeBufferHighWaterMark`, the channel will not become writeable again until the buffer is drained and it drops below this value. |65536
 |=========================================================
@@ -1031,6 +1039,9 @@ See the <<metrics,Metrics>> section for more information on how to configure Gan
 [[opprocessor-configurations]]
 ==== OpProcessor Configurations
 
+IMPORTANT: The `UnifiedChannelizer` does not rely on `OpProcessor` infrastructure. If using that channelizer, these
+configuration options can be ignored.
+
 An `OpProcessor` provides a way to plug-in handlers to Gremlin Server's processing flow. Gremlin Server uses this
 plug-in system itself to expose the packaged functionality that it exposes. Configurations can be supplied to an
 `OpProcessor` through the `processors` key in the Gremlin Server configuration file. Each `OpProcessor` can take a
@@ -2047,22 +2058,35 @@ expected workload. More discussion on this topic can be found in the <<parameter
 Section below.
 * When configuring the size of `threadPoolWorker` start with the default of `1` and increment by one as needed to a
 maximum of `2*number of cores`.
-* The "right" size of the `gremlinPool` setting is somewhat dependent on the type of scripts that will be processed
+* The "right" size of the `gremlinPool` setting is somewhat dependent on the type of requests that will be processed
 by Gremlin Server.  As requests arrive to Gremlin Server they are decoded and queued to be processed by threads in
 this pool.  When this pool is exhausted of threads, Gremlin Server will continue to accept incoming requests, but
 the queue will continue to grow.  If left to grow too large, the server will begin to slow.  When tuning around
 this setting, consider whether the bulk of the scripts being processed will be "fast" or "slow", where "fast"
 generally means being measured in the low hundreds of milliseconds and "slow" means anything longer than that.
-* Scripts that are "slow" can really hurt Gremlin Server if they are not properly accounted for.  `ScriptEngine`
-evaluations are blocking operations that aren't always easily interrupted, so once a "slow" script is being evaluated in
-the context of a `ScriptEngine` it must finish its work.  Lots of "slow" scripts will eventually consume the
-`gremlinPool` preventing other scripts from getting processed from the queue.
+* Requests that are "slow" can really hurt Gremlin Server if they are not properly accounted for. Since these requests
+block a thread until the job is complete or successfully interrupted, lots of long-run requests will eventually consume
+the `gremlinPool` preventing other requests from getting processed from the queue.
 ** To limit the impact of this problem, consider properly setting the `evaluationTimeout` to something "sane".
 In other words, test the traversals being sent to Gremlin Server and determine the maximum time they take to evaluate
-and iterate over results, then set the timeout value accordingly.
+and iterate over results, then set the timeout value accordingly. Also, consider setting a shorter global timeout for
+requests and then use longer per-request timeouts for those specific ones that might execute at a longer rate.
 ** Note that `evaluationTimeout` can only attempt to interrupt the evaluation on timeout.  It allows Gremlin
 Server to "ignore" the result of that evaluation, which means the thread in the `gremlinPool` that did the evaluation
 may still be consumed after the timeout if interruption does not succeed on the thread.
+* When using sessions, there are different options to consider depending on the `Channelizer` implementation being
+used:
+** `WebSocketChannelizer` and `WsAndHttpChannelizer` - Both of these channelizers use the `gremlinPool` only for
+sessionless requests and construct a single threaded pool for each session created. In this way, these channelizers
+tend to optimize sessions to be long-lived. For short-lived sessions, which may be typical when using bytecode based
+remote transactions, quickly creating and destroying these sessions can be expensive. It is likely that there will be
+increased garbage collection times and frequency as well as a general increase in overall server processing.
+** `UnifiedChannelizer` - The threads of the `gremlinPool` are used to service both sessions and sessionless requests.
+With a common thread pool, this channelizer is a better choice when using lots of short-lived sessions as compared to
+`WebSocketChannelizer` and `WsAndHttpChannelizer`, because there is less cost in starting and stopping sessions. It is
+important though to understand the expected workload for the server and plan the size accordingly to ensure that the
+server does not need to wait for an extended period of time for a thread to be available to process the queue of
+incoming requests.
 * Graph element serialization for `Vertex` and `Edge` can be expensive, as their data structures are complex given the
 possible existence of multi-properties and meta-properties. When returning data from Gremlin Server only return the
 data that is required. For example, if only two properties of a `Vertex` are needed then simply return the two rather
diff --git a/docs/src/upgrade/release-3.5.x.asciidoc b/docs/src/upgrade/release-3.5.x.asciidoc
index 0663891..1537ec9 100644
--- a/docs/src/upgrade/release-3.5.x.asciidoc
+++ b/docs/src/upgrade/release-3.5.x.asciidoc
@@ -410,23 +410,6 @@ these values are not hashable and will result in an error. By introducing a `Has
 See: link:https://issues.apache.org/jira/browse/TINKERPOP-2395[TINKERPOP-2395],
 link:https://issues.apache.org/jira/browse/TINKERPOP-2407[TINKERPOP-2407]
 
-==== Gremlin Server UnifiedChannelizer
-
-Just some notes for later:
-
-* UnifiedChannelizer technically replaces all existing implementations but is not yet the default
-* Some new settings related to it: maxParameters, sessionLifeTimeout, useGlobalFunctionCacheForSessions, useCommonEngineForSessions
-* Session behavior shifts slightly under this channelizer for async calls, where a failure will mean that the session
-will close, remaining requests in the queue will be ignored and rollback will occur.
-* care should be take with strict transaction management and multi-graph transactions (which aren't real - not a new thing)
-* absolute max lifetime of a session is a new thing
-* transaction semantic under unified
-** user manually calls commit() commits transaction
-** user manually calls rollback()
-** user manually calls close() on Cluster
-** user manually calls close() on Tx or GraphTraversalSource spawned from Transaction
-** server error
-
 ==== Gremlin Server Audit Logging
 
 The `authentication.enableAuditlog` configuration property is deprecated, but replaced by the `enableAuditLog` property
@@ -491,6 +474,40 @@ future releases on the 3.5.x line.
 See: link:https://issues.apache.org/jira/browse/TINKERPOP-2537[TINKERPOP-2537],
 link:https://tinkerpop.apache.org/docs/current/reference/#transactions[Reference Documentation - Transactions]
 
+==== Gremlin Server UnifiedChannelizer
+
+Gremlin Server uses a `Channelizer` abstraction to configure different Netty pipelines which can then offer different
+server behaviors. Most commonly, users configure the `WebSocketChannelizer` to enable the websocket protocol to which
+the various language drivers can connect.
+
+TinkerPop 3.5.0 introduces a new `Channelizer` implementation called the `UnifiedChannelizer`. This channelizer is
+somewhat similar to the `WsAndHttpChannelizer` in that combines websocket and standard HTTP protocols in the server,
+but it provides a new and improved thread management approach as well as a more streamlined execution model. The
+`UnifiedChannelizer` technically replaces all existing implementations, but is not yet configured by default in Gremlin
+Server. To use it, modify the `channelizer` setting in the server yaml file as follows:
+
+```[source,yaml]
+----
+channelizer: org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer
+----
+
+As the `UnifiedChannelizer` is tested further, it will eventually become the default implementation. It may however
+be the preferred channelizer when using large numbers of short-lived sessions as the the threading model of the
+`UnifiedChannelizer` is better suited for such situations. If using this new channelizer, there are a few considerations
+to keep in mind:
+
+* The `UnifiedChannelizer` does not use the `OpProcessor` infrastructure, therefore those
+link:https://tinkerpop.apache.org/docs/3.5.0/reference/#opprocessor-configurations[configurations] are no longer
+relevant and can be ignored.
+
+it is important to read about
+the `gremlinPool` setting in the link:https://tinkerpop.apache.org/docs/3.5.0/reference/#_tuning[Tuning Section] of
+the reference documentation and to look into the link:https://tinkerpop.apache.org/docs/3.5.0/reference/#_configuring_2[new configurations]
+available related to this channelizer: `maxParameters`, `sessionLifeTimeout`, `useGlobalFunctionCacheForSessions`, and
+`useCommonEngineForSessions`.
+
+See: link:https://issues.apache.org/jira/browse/TINKERPOP-2245[TINKERPOP-2245]
+
 ==== Retry Conditions
 
 Some error conditions are temporary in nature and therefore an operation that ends in such a situation may be tried

[tinkerpop] 05/07: TINKERPOP-2245 Changed logging for sessions

Posted by sp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2245
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 38aa0d2e466f554543996b122d9e2767cfe2bb00
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Thu Apr 1 13:44:48 2021 -0400

    TINKERPOP-2245 Changed logging for sessions
    
    With remote transactions there will be many more sessions created/destroyed. It would be excessive to log them all - changed to debug level.
---
 .../gremlin/server/handler/MultiRexster.java          |  2 +-
 .../tinkerpop/gremlin/server/op/session/Session.java  |  8 ++++----
 .../server/GremlinServerSessionIntegrateTest.java     | 19 +++++++++++--------
 .../gremlin/server/GremlinSessionTxIntegrateTest.java |  5 +++--
 4 files changed, 19 insertions(+), 15 deletions(-)

diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java
index a54b905..ceba2a2 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java
@@ -178,7 +178,7 @@ public class MultiRexster extends AbstractRexster {
         ending.set(true);
         cancelRequestTimeout();
         super.close();
-        logger.info("Session {} closed", getSessionId());
+        logger.debug("Session {} closed", getSessionId());
     }
 
     private void cancelRequestTimeout() {
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
index 8d4fe3f..f7934d5 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
@@ -93,7 +93,7 @@ public class Session {
     private final ConcurrentHashMap<String, Session> sessions;
 
     public Session(final String session, final Context context, final ConcurrentHashMap<String, Session> sessions) {
-        logger.info("New session established for {}", session);
+        logger.debug("New session established for {}", session);
         this.session = session;
         this.bindings = new SimpleBindings();
         this.settings = context.getSettings();
@@ -201,7 +201,7 @@ public class Session {
                     try {
                         executor.submit(() -> {
                             if (g.tx().isOpen()) {
-                                logger.info("Rolling back open transactions on {} before killing session: {}", gName, session);
+                                logger.debug("Rolling back open transactions on {} before killing session: {}", gName, session);
                                 g.tx().rollback();
                             }
                         }).get(configuredPerGraphCloseTimeout, TimeUnit.MILLISECONDS);
@@ -211,7 +211,7 @@ public class Session {
                 }
             });
         } else {
-            logger.info("Skipped attempt to close open graph transactions on {} - close was forced", session);
+            logger.debug("Skipped attempt to close open graph transactions on {} - close was forced", session);
         }
 
         // prevent any additional requests from processing. if the kill was not "forced" then jobs were scheduled to
@@ -225,7 +225,7 @@ public class Session {
         // once a session is dead release the gauges in the registry for it
         MetricManager.INSTANCE.getRegistry().removeMatching((s, metric) -> s.contains(session));
 
-        logger.info("Session {} closed", session);
+        logger.debug("Session {} closed", session);
     }
 
     private GremlinExecutor.Builder initializeGremlinExecutor() {
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
index 6d07558..28adc61 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
@@ -33,6 +33,8 @@ import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
 import org.apache.tinkerpop.gremlin.driver.simple.SimpleClient;
 import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.OpSelectorHandler;
+import org.apache.tinkerpop.gremlin.server.op.session.Session;
 import org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor;
 import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
 import org.junit.After;
@@ -74,7 +76,8 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
             case "shouldCloseSessionOnceOnRequest":
             case "shouldHaveTheSessionTimeout":
             case "shouldCloseSessionOnClientClose":
-                Logger.getRootLogger().setLevel(Level.INFO);
+                final org.apache.log4j.Logger sessionLogger = org.apache.log4j.Logger.getLogger(Session.class);
+                sessionLogger.setLevel(Level.DEBUG);
                 break;
         }
         rootLogger.addAppender(recordingAppender);
@@ -82,9 +85,9 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
 
     @After
     public void teardownForEachTest() {
-        final Logger rootLogger = Logger.getRootLogger();
-        rootLogger.setLevel(originalLevel);
-        rootLogger.removeAppender(recordingAppender);
+        final org.apache.log4j.Logger sessionLogger = org.apache.log4j.Logger.getLogger(Session.class);
+        sessionLogger.setLevel(originalLevel);
+        sessionLogger.removeAppender(recordingAppender);
     }
 
     /**
@@ -167,8 +170,8 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
         if (isUsingUnifiedChannelizer()) {
             assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
         } else {
-            assertThat(recordingAppender.getMessages(), hasItem("INFO - Skipped attempt to close open graph transactions on shouldCloseSessionOnClientClose - close was forced\n"));
-            assertThat(recordingAppender.getMessages(), hasItem("INFO - Session shouldCloseSessionOnClientClose closed\n"));
+            assertThat(recordingAppender.getMessages(), hasItem("DEBUG - Skipped attempt to close open graph transactions on shouldCloseSessionOnClientClose - close was forced\n"));
+            assertThat(recordingAppender.getMessages(), hasItem("DEBUG - Session shouldCloseSessionOnClientClose closed\n"));
         }
 
         // try to reconnect to that session and make sure no state is there
@@ -369,7 +372,7 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
             assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
         } else {
             assertEquals(1, recordingAppender.getMessages().stream()
-                    .filter(msg -> msg.equals("INFO - Session shouldCloseSessionOnceOnRequest closed\n")).count());
+                    .filter(msg -> msg.equals("DEBUG - Session shouldCloseSessionOnceOnRequest closed\n")).count());
         }
     }
 
@@ -409,7 +412,7 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
         } else {
             // there will be one for the timeout and a second for closing the cluster
             assertEquals(2, recordingAppender.getMessages().stream()
-                    .filter(msg -> msg.equals("INFO - Session shouldHaveTheSessionTimeout closed\n")).count());
+                    .filter(msg -> msg.equals("DEBUG - Session shouldHaveTheSessionTimeout closed\n")).count());
         }
     }
 
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionTxIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionTxIntegrateTest.java
index 534f867..89784f9 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionTxIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionTxIntegrateTest.java
@@ -214,8 +214,9 @@ public class GremlinSessionTxIntegrateTest extends AbstractGremlinServerIntegrat
         final Cluster cluster = TestClientFactory.build().create();
         final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
 
-        // need to open significantly more sessions that we have threads in gremlinPool
-        final int numberOfSessions = 500;
+        // need to open significantly more sessions that we have threads in gremlinPool. if we go too obscene on
+        // OpProcessor this test will take too long
+        final int numberOfSessions = isUsingUnifiedChannelizer() ? 1000 : 100;
         for (int ix = 0; ix < numberOfSessions; ix ++) {
             final Transaction tx = g.tx();
             final GraphTraversalSource gtx = tx.begin();

[tinkerpop] 02/07: TINKERPOP-2245 Refactored the session side of UnifiedChannelizer

Posted by sp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2245
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit a19c2867c4d312a64a9f8cdb4c182ab1304d09c2
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Wed Mar 31 14:33:31 2021 -0400

    TINKERPOP-2245 Refactored the session side of UnifiedChannelizer
    
    MultiRexster didn't have quite the same semantics as the SessionOpProcessor. Had to add in an option to configure the MultiRexster to work that way. Also sorted out some issues with timeouts with sessions which required a RexsterExecutorService so that a Thread processing a Rexster could be interrupted independently of the Future as calling cancel() on the Future is something that can only be done once and if you wish to preserve state between failures of requests in a session (i.e. t [...]
---
 docs/src/upgrade/release-3.5.x.asciidoc            |  6 ++
 .../apache/tinkerpop/gremlin/driver/Client.java    | 19 ++++++
 .../apache/tinkerpop/gremlin/driver/Tokens.java    |  7 ++
 .../apache/tinkerpop/gremlin/server/Context.java   |  4 ++
 .../gremlin/server/handler/AbstractRexster.java    | 77 ++++++++++++++++------
 .../gremlin/server/handler/MultiRexster.java       | 59 ++++++++++++-----
 .../tinkerpop/gremlin/server/handler/Rexster.java  |  5 ++
 .../gremlin/server/handler/UnifiedHandler.java     | 38 +++++++++--
 .../server/util/RexsterExecutorService.java        | 50 ++++++++++++++
 .../gremlin/server/util/RexsterFutureTask.java     | 44 +++++++++++++
 .../gremlin/server/util/ServerGremlinExecutor.java | 14 +++-
 .../AbstractGremlinServerIntegrationTest.java      |  5 ++
 .../gremlin/server/GremlinDriverIntegrateTest.java | 47 +++----------
 .../gremlin/server/GremlinServerIntegrateTest.java |  4 ++
 .../server/GremlinServerSessionIntegrateTest.java  | 61 +++++++++++++++--
 .../src/test/resources/log4j-test.properties       |  3 +
 16 files changed, 357 insertions(+), 86 deletions(-)

diff --git a/docs/src/upgrade/release-3.5.x.asciidoc b/docs/src/upgrade/release-3.5.x.asciidoc
index 3b99d58..0663891 100644
--- a/docs/src/upgrade/release-3.5.x.asciidoc
+++ b/docs/src/upgrade/release-3.5.x.asciidoc
@@ -420,6 +420,12 @@ Just some notes for later:
 will close, remaining requests in the queue will be ignored and rollback will occur.
 * care should be take with strict transaction management and multi-graph transactions (which aren't real - not a new thing)
 * absolute max lifetime of a session is a new thing
+* transaction semantic under unified
+** user manually calls commit() commits transaction
+** user manually calls rollback()
+** user manually calls close() on Cluster
+** user manually calls close() on Tx or GraphTraversalSource spawned from Transaction
+** server error
 
 ==== Gremlin Server Audit Logging
 
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index ac14c85..3ad22cf 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -688,6 +688,7 @@ public abstract class Client {
     public final static class SessionedClient extends Client {
         private final String sessionId;
         private final boolean manageTransactions;
+        private final boolean maintainStateAfterException;
 
         private ConnectionPool connectionPool;
 
@@ -697,6 +698,7 @@ public abstract class Client {
             super(cluster, settings);
             this.sessionId = settings.getSession().get().sessionId;
             this.manageTransactions = settings.getSession().get().manageTransactions;
+            this.maintainStateAfterException = settings.getSession().get().maintainStateAfterException;
         }
 
         /**
@@ -714,6 +716,7 @@ public abstract class Client {
             builder.processor("session");
             builder.addArg(Tokens.ARGS_SESSION, sessionId);
             builder.addArg(Tokens.ARGS_MANAGE_TRANSACTION, manageTransactions);
+            builder.addArg(Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION, maintainStateAfterException);
             return builder;
         }
 
@@ -838,11 +841,13 @@ public abstract class Client {
         private final boolean manageTransactions;
         private final String sessionId;
         private final boolean forceClosed;
+        private final boolean maintainStateAfterException;
 
         private SessionSettings(final Builder builder) {
             manageTransactions = builder.manageTransactions;
             sessionId = builder.sessionId;
             forceClosed = builder.forceClosed;
+            maintainStateAfterException = builder.maintainStateAfterException;
         }
 
         /**
@@ -867,6 +872,10 @@ public abstract class Client {
             return forceClosed;
         }
 
+        public boolean maintainStateAfterException() {
+            return maintainStateAfterException;
+        }
+
         public static SessionSettings.Builder build() {
             return new SessionSettings.Builder();
         }
@@ -875,11 +884,21 @@ public abstract class Client {
             private boolean manageTransactions = false;
             private String sessionId = UUID.randomUUID().toString();
             private boolean forceClosed = false;
+            private boolean maintainStateAfterException = false;
 
             private Builder() {
             }
 
             /**
+             * If enabled, errors related to individual request timeouts or errors during processing will no result
+             * in a close of the session itself.
+             */
+            public Builder maintainStateAfterException(final boolean maintainStateAfterException) {
+                this.maintainStateAfterException = maintainStateAfterException;
+                return this;
+            }
+
+            /**
              * If enabled, transactions will be "managed" such that each request will represent a complete transaction.
              * By default this value is {@code false}.
              */
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
index 5617127..ba61e46 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
@@ -78,6 +78,13 @@ public final class Tokens {
     public static final String ARGS_HOST = "host";
     public static final String ARGS_SESSION = "session";
     public static final String ARGS_MANAGE_TRANSACTION = "manageTransaction";
+
+    /**
+     * Argument name that is intended to be used with a session which when its value is {@code true} makes it so
+     * that a processing error or request timeout will not close the session, but leave it to continue processing in
+     * whatever state it may hold.
+     */
+    public static final String ARGS_MAINTAIN_STATE_AFTER_EXCEPTION = "maintainStateAfterException";
     public static final String ARGS_SASL = "sasl";
     public static final String ARGS_SASL_MECHANISM = "saslMechanism";
 
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
index fcd2072..b77c883 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
@@ -76,6 +76,10 @@ public class Context {
         return requestTimeout;
     }
 
+    public boolean isFinalResponseWritten() {
+        return this.finalResponseWritten.get();
+    }
+
     public RequestContentType getRequestContentType() {
         return requestContentType;
     }
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java
index 7f7c51c..e135b6c 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java
@@ -67,6 +67,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -84,36 +85,52 @@ public abstract class AbstractRexster implements Rexster, AutoCloseable {
     private static final Logger logger = LoggerFactory.getLogger(AbstractRexster.class);
     private static final Logger auditLogger = LoggerFactory.getLogger(GremlinServer.AUDIT_LOGGER_NAME);
 
+    private final boolean sessionIdOnRequest;
     private final Channel initialChannel;
     private final boolean transactionManaged;
     private final String sessionId;
     private final AtomicReference<ScheduledFuture<?>> sessionCancelFuture = new AtomicReference<>();
     private final AtomicReference<Future<?>> sessionFuture = new AtomicReference<>();
-    private long actualTimeoutLength = 0;
-    private boolean actualTimeoutCausedBySession = false;
+    private long actualTimeoutLengthWhenClosed = 0;
+    private Thread sessionThread;
+    protected final boolean maintainStateAfterException;
+    protected final AtomicReference<CloseReason> closeReason = new AtomicReference<>();
     protected final GraphManager graphManager;
     protected final ConcurrentMap<String, Rexster> sessions;
     protected final Set<String> aliasesUsedByRexster = new HashSet<>();
 
+    protected enum CloseReason { UNDETERMINED, CHANNEL_CLOSED, SESSION_TIMEOUT, REQUEST_TIMEOUT, NORMAL }
+
     AbstractRexster(final Context gremlinContext, final String sessionId,
-                    final boolean transactionManaged, final ConcurrentMap<String, Rexster> sessions) {
+                    final boolean transactionManaged,
+                    final ConcurrentMap<String, Rexster> sessions) {
+        // this only applies to sessions
+        this.maintainStateAfterException = (boolean) gremlinContext.getRequestMessage().
+                optionalArgs(Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION).orElse(false);
+        this.sessionIdOnRequest = gremlinContext.getRequestMessage().optionalArgs(Tokens.ARGS_SESSION).isPresent();
         this.transactionManaged = transactionManaged;
         this.sessionId = sessionId;
         this.initialChannel = gremlinContext.getChannelHandlerContext().channel();
 
         // close Rexster if the channel closes to cleanup and close transactions
         this.initialChannel.closeFuture().addListener(f -> {
-            // cancel session worker or it will keep waiting for items to appear in the session queue
-            final Future<?> sf = sessionFuture.get();
-            if (sf != null && !sf.isDone()) {
-                sf.cancel(true);
+            if (closeReason.compareAndSet(null, CloseReason.CHANNEL_CLOSED)) {
+                // cancel session worker or it will keep waiting for items to appear in the session queue
+                cancel(true);
+                close();
             }
-            close();
         });
         this.sessions = sessions;
         this.graphManager = gremlinContext.getGraphManager();
     }
 
+    protected synchronized void cancel(final boolean mayInterruptIfRunning) {
+        final FutureTask<?> sf = (FutureTask) sessionFuture.get();
+        if (sf != null && !sf.isDone()) {
+            sf.cancel(mayInterruptIfRunning);
+        }
+    }
+
     public boolean isTransactionManaged() {
         return transactionManaged;
     }
@@ -126,18 +143,22 @@ public abstract class AbstractRexster implements Rexster, AutoCloseable {
         return channel == initialChannel;
     }
 
-    public long getActualTimeoutLength() {
-        return actualTimeoutLength;
+    public long getActualTimeoutLengthWhenClosed() {
+        return actualTimeoutLengthWhenClosed;
     }
 
-    public boolean isActualTimeoutCausedBySession() {
-        return actualTimeoutCausedBySession;
+    public Optional<CloseReason> getCloseReason() {
+        return Optional.ofNullable(closeReason.get());
     }
 
     public GremlinScriptEngine getScriptEngine(final Context context, final String language) {
         return context.getGremlinExecutor().getScriptEngineManager().getEngineByName(language);
     }
 
+    public void setSessionThread(final Thread runner) {
+        this.sessionThread = runner;
+    }
+
     @Override
     public void setSessionCancelFuture(final ScheduledFuture<?> f) {
         if (!sessionCancelFuture.compareAndSet(null, f))
@@ -156,9 +177,16 @@ public abstract class AbstractRexster implements Rexster, AutoCloseable {
         // for final cleanup
         final Future<?> f = sessionFuture.get();
         if (f != null && !f.isDone()) {
-            actualTimeoutCausedBySession = causedBySession;
-            actualTimeoutLength = timeout;
-            sessionFuture.get().cancel(true);
+            if (closeReason.compareAndSet(null, causedBySession ? CloseReason.SESSION_TIMEOUT : CloseReason.REQUEST_TIMEOUT)) {
+                actualTimeoutLengthWhenClosed = timeout;
+
+                // if caused by a session timeout for a session OR if it is a request timeout for a sessionless
+                // request then we can just straight cancel() the Rexster instance
+                if (causedBySession || !sessionIdOnRequest)
+                    cancel(true);
+                else
+                    sessionThread.interrupt();
+            }
         }
     }
 
@@ -224,11 +252,22 @@ public abstract class AbstractRexster implements Rexster, AutoCloseable {
         if (root instanceof InterruptedException ||
                 root instanceof TraversalInterruptedException ||
                 root instanceof InterruptedIOException) {
-            final String msg = actualTimeoutCausedBySession ?
-                    String.format("Session closed - %s - sessionLifetimeTimeout of %s ms exceeded", sessionId, actualTimeoutLength) :
-                    String.format("Evaluation exceeded timeout threshold of %s ms", actualTimeoutLength);
+            String msg = "Processing interrupted but the reason why was not known";
+            switch (closeReason.get()) {
+                case CHANNEL_CLOSED:
+                    msg = "Processing interrupted because the channel was closed";
+                    break;
+                case SESSION_TIMEOUT:
+                    msg = String.format("Session closed - %s - sessionLifetimeTimeout of %s ms exceeded", sessionId, actualTimeoutLengthWhenClosed);
+                    break;
+                case REQUEST_TIMEOUT:
+                    msg = String.format("Evaluation exceeded timeout threshold of %s ms", actualTimeoutLengthWhenClosed);
+                    break;
+            }
+            final ResponseStatusCode code = closeReason.get() == CloseReason.SESSION_TIMEOUT || closeReason.get() == CloseReason.REQUEST_TIMEOUT ?
+                    ResponseStatusCode.SERVER_ERROR_TIMEOUT : ResponseStatusCode.SERVER_ERROR;
             throw new RexsterException(msg, root, ResponseMessage.build(gremlinContext.getRequestMessage())
-                    .code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+                    .code(code)
                     .statusMessage(msg).create());
         }
 
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java
index 539c16b..a54b905 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java
@@ -87,7 +87,6 @@ public class MultiRexster extends AbstractRexster {
 
     @Override
     public void addTask(final Context gremlinContext) {
-        // todo: explicitly reject request???
         if (acceptingTasks())
             queue.offer(gremlinContext);
     }
@@ -96,33 +95,48 @@ public class MultiRexster extends AbstractRexster {
     public void run() {
         // there must be one item in the queue at least since addTask() gets called before the worker
         // is ever started
-        Context gremlinContext = queue.poll();
-        if (null == gremlinContext)
+        Context currentGremlinContext = queue.poll();
+        if (null == currentGremlinContext)
             throw new IllegalStateException(String.format("Worker has no initial context for session: %s", getSessionId()));
 
         try {
-            startTransaction(gremlinContext);
+            startTransaction(currentGremlinContext);
             try {
                 while (true) {
                     // schedule timeout for the current request from the queue
-                    final long seto = gremlinContext.getRequestTimeout();
+                    final long seto = currentGremlinContext.getRequestTimeout();
                     requestCancelFuture = scheduledExecutorService.schedule(
                             () -> this.triggerTimeout(seto, false),
                             seto, TimeUnit.MILLISECONDS);
 
-                    process(gremlinContext);
+                    // only stop processing stuff in the queue if this Rexster isn't configured to hold state between
+                    // exceptions (i.e. the old OpProcessor way) or if this Rexster is closing down by certain death
+                    // (i.e. channel close or lifetime session timeout)
+                    try {
+                        process(currentGremlinContext);
+                    } catch (RexsterException ex) {
+                        if (!maintainStateAfterException || closeReason.get() == CloseReason.CHANNEL_CLOSED ||
+                            closeReason.get() == CloseReason.SESSION_TIMEOUT) {
+                            throw ex;
+                        }
+
+                        // reset the close reason as we are maintaining state
+                        closeReason.set(null);
+
+                        logger.warn(ex.getMessage(), ex);
+                        currentGremlinContext.writeAndFlush(ex.getResponseMessage());
+                    }
 
                     // work is done within the timeout period so cancel it
                     cancelRequestTimeout();
 
-                    gremlinContext = queue.take();
+                    currentGremlinContext = queue.take();
                 }
             } catch (Exception ex) {
-                // stop accepting requests on this worker since it is heading to close()
-                ending.set(true);
+                stopAcceptingRequests();
 
                 // the current context gets its exception handled...
-                handleException(gremlinContext, ex);
+                handleException(currentGremlinContext, ex);
             }
         } catch (RexsterException rexex) {
             // remaining work items in the queue are ignored since this worker is closing. must send
@@ -134,7 +148,7 @@ public class MultiRexster extends AbstractRexster {
                         .code(ResponseStatusCode.SERVER_ERROR)
                         .statusMessage(String.format(
                                 "An earlier request [%s] failed prior to this one having a chance to execute",
-                                gremlinContext.getRequestMessage().getRequestId())).create());
+                                currentGremlinContext.getRequestMessage().getRequestId())).create());
             }
 
             // exception should trigger a rollback in the session. a more focused rollback may have occurred
@@ -142,15 +156,25 @@ public class MultiRexster extends AbstractRexster {
             // the request
             closeTransactionSafely(Transaction.Status.ROLLBACK);
 
-            logger.warn(rexex.getMessage(), rexex);
-            gremlinContext.writeAndFlush(rexex.getResponseMessage());
+            // the current context could already be completed with SUCCESS and we're just waiting for another
+            // one to show up while a timeout occurs or the channel closes. in these cases, this would be a valid
+            // close in all likelihood so there's no reason to log or alert the client as the client already has
+            // the best answer
+            if (!currentGremlinContext.isFinalResponseWritten()) {
+                logger.warn(rexex.getMessage(), rexex);
+                currentGremlinContext.writeAndFlush(rexex.getResponseMessage());
+            }
         } finally {
-            close();
+            // if this is a normal end to the session or if the session life timeout is exceeded then the
+            // session needs to be removed and everything cleaned up
+            if (closeReason.compareAndSet(null, CloseReason.NORMAL) || closeReason.get() == CloseReason.SESSION_TIMEOUT) {
+                close();
+            }
         }
     }
 
     @Override
-    public synchronized void close() {
+    public void close() {
         ending.set(true);
         cancelRequestTimeout();
         super.close();
@@ -162,6 +186,11 @@ public class MultiRexster extends AbstractRexster {
             requestCancelFuture.cancel(true);
     }
 
+    private void stopAcceptingRequests() {
+        ending.set(true);
+        cancel(true);
+    }
+
     @Override
     protected Bindings getWorkerBindings() throws RexsterException {
         if (null == bindings)
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java
index 70869dd..41b39f6 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java
@@ -50,6 +50,11 @@ public interface Rexster extends Runnable {
     void setSessionFuture(final Future<?> f);
 
     /**
+     * Sets a reference to the all powerful thread that is running this Rexster.
+     */
+    void setSessionThread(final Thread t);
+
+    /**
      * Provides a general way to tell Rexster that it has exceeded some timeout condition.
      */
     void triggerTimeout(final long timeout, boolean causedBySession);
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java
index 7483062..5cad751 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java
@@ -149,19 +149,19 @@ public class UnifiedHandler extends SimpleChannelInboundHandler<RequestMessage>
             if (sessions.containsKey(sessionId)) {
                 final Rexster rexster = sessions.get(sessionId);
 
-                // check if the session is still accepting requests - if not block further requests
-                if (!rexster.acceptingTasks()) {
-                    final String sessionClosedMessage = String.format(
-                            "Session %s is no longer accepting requests as it has been closed", sessionId);
+                // check if the session is bound to this channel, thus one client per session
+                if (!rexster.isBoundTo(gremlinContext.getChannelHandlerContext().channel())) {
+                    final String sessionClosedMessage = String.format("Session %s is not bound to the connecting client", sessionId);
                     final ResponseMessage response = ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
                             .statusMessage(sessionClosedMessage).create();
                     ctx.writeAndFlush(response);
                     return;
                 }
 
-                // check if the session is bound to this channel, thus one client per session
-                if (!rexster.isBoundTo(gremlinContext.getChannelHandlerContext().channel())) {
-                    final String sessionClosedMessage = String.format("Session %s is not bound to the connecting client", sessionId);
+                // check if the session is still accepting requests - if not block further requests
+                if (!rexster.acceptingTasks()) {
+                    final String sessionClosedMessage = String.format(
+                            "Session %s is no longer accepting requests as it has been closed", sessionId);
                     final ResponseMessage response = ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
                             .statusMessage(sessionClosedMessage).create();
                     ctx.writeAndFlush(response);
@@ -201,6 +201,30 @@ public class UnifiedHandler extends SimpleChannelInboundHandler<RequestMessage>
             throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
         }
 
+        if (message.optionalArgs(Tokens.ARGS_SESSION).isPresent()) {
+            final Optional<Object> mtx = message.optionalArgs(Tokens.ARGS_MANAGE_TRANSACTION);
+            if (mtx.isPresent() && !(mtx.get() instanceof Boolean)) {
+                final String msg = String.format("%s argument must be of type boolean", Tokens.ARGS_MANAGE_TRANSACTION);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+
+            final Optional<Object> msae = message.optionalArgs(Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION);
+            if (msae.isPresent() && !(msae.get() instanceof Boolean)) {
+                final String msg = String.format("%s argument must be of type boolean", Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+        } else {
+            if (message.optionalArgs(Tokens.ARGS_MANAGE_TRANSACTION).isPresent()) {
+                final String msg = String.format("%s argument only applies to requests made for sessions", Tokens.ARGS_MANAGE_TRANSACTION);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+
+            if (message.optionalArgs(Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION).isPresent()) {
+                final String msg = String.format("%s argument only applies to requests made for sessions", Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+        }
+
         if (message.optionalArgs(Tokens.ARGS_BINDINGS).isPresent()) {
             final Map bindings = (Map) message.getArgs().get(Tokens.ARGS_BINDINGS);
             if (IteratorUtils.anyMatch(bindings.keySet().iterator(), k -> null == k || !(k instanceof String))) {
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/RexsterExecutorService.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/RexsterExecutorService.java
new file mode 100644
index 0000000..1ab738c
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/RexsterExecutorService.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.util;
+
+import org.apache.tinkerpop.gremlin.server.handler.Rexster;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A special {@code ThreadPoolExecutor} which will construct {@link RexsterFutureTask} instances and inject the
+ * current running thread into a {@link Rexster} instance if one is present.
+ */
+public class RexsterExecutorService extends ThreadPoolExecutor {
+
+    public RexsterExecutorService(final int nThreads, final ThreadFactory threadFactory) {
+        super(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory);
+    }
+
+    @Override
+    protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
+        return new RexsterFutureTask<>(runnable, value);
+    }
+
+
+    @Override
+    protected void beforeExecute(final Thread t, final Runnable r) {
+        if (r instanceof RexsterFutureTask)
+            ((RexsterFutureTask<?>) r).getRexster().ifPresent(rex -> rex.setSessionThread(t));
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/RexsterFutureTask.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/RexsterFutureTask.java
new file mode 100644
index 0000000..6aebfdb
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/RexsterFutureTask.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.util;
+
+import org.apache.tinkerpop.gremlin.server.handler.Rexster;
+
+import java.util.Optional;
+import java.util.concurrent.FutureTask;
+
+/**
+ * A cancellable asynchronous operation with the added ability to get a {@code Rexster} instance if the
+ * {@code Runnable} for the task was of that type.
+ */
+public class RexsterFutureTask<V> extends FutureTask<V> {
+
+    private final Rexster rexster;
+
+    public RexsterFutureTask(final Runnable runnable, final  V result) {
+        super(runnable, result);
+
+        // hold an instance to the Rexster instance if it is of that type
+        this.rexster = runnable instanceof Rexster ? (Rexster) runnable : null;
+    }
+
+    public Optional<Rexster> getRexster() {
+        return Optional.ofNullable(this.rexster);
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
index 67a1608..0102956 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
@@ -25,6 +25,7 @@ import org.apache.tinkerpop.gremlin.server.Channelizer;
 import org.apache.tinkerpop.gremlin.server.GraphManager;
 import org.apache.tinkerpop.gremlin.server.GremlinServer;
 import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,6 +70,8 @@ public class ServerGremlinExecutor {
      * {@code scheduleExecutorServiceClass} is set to {@code null} it will be created via
      * {@link Executors#newScheduledThreadPool(int, ThreadFactory)}.  If either of the {@link ExecutorService}
      * instances are supplied, the {@link Settings#gremlinPool} value will be ignored for the pool size.
+     *
+     * @param gremlinExecutorService Expects a RexsterExecutorService if using the {@link UnifiedChannelizer}.
      */
     public ServerGremlinExecutor(final Settings settings, final ExecutorService gremlinExecutorService,
                                  final ScheduledExecutorService scheduledExecutorService) {
@@ -93,8 +96,17 @@ public class ServerGremlinExecutor {
 
         if (null == gremlinExecutorService) {
             final ThreadFactory threadFactoryGremlin = ThreadFactoryUtil.create("exec-%d");
-            this.gremlinExecutorService = Executors.newFixedThreadPool(settings.gremlinPool, threadFactoryGremlin);
+
+            // RexsterExecutorService adds some important bits that are helpful to the UnifiedChannelizer, but
+            // using it generally should really have no ill effect to the old OpProcessor stuff or GremlinExecutor
+            // in general.
+            this.gremlinExecutorService = new RexsterExecutorService(settings.gremlinPool, threadFactoryGremlin);
         } else {
+            if (settings.channelizer.equals(UnifiedChannelizer.class.getName())) {
+                logger.error("The {} requires use of a {} for the GremlinExecutor but a {} was provided instead",
+                        settings.channelizer, RexsterExecutorService.class.getName(), gremlinExecutorService.getClass().getName());
+            }
+
             this.gremlinExecutorService = gremlinExecutorService;
         }
 
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
index deb1734..c443d6d 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
@@ -161,6 +161,11 @@ public abstract class AbstractGremlinServerIntegrationTest {
         OpLoader.reset();
     }
 
+    protected boolean isUsingUnifiedChannelizer() {
+        return server.getServerGremlinExecutor().
+                getSettings().channelizer.equals(UnifiedChannelizer.class.getName());
+    }
+
     public static boolean deleteDirectory(final File directory) {
         if (directory.exists()) {
             final File[] files = directory.listFiles();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index c11cad8..ce93582 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -42,7 +42,6 @@ import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin;
 import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
 import org.apache.tinkerpop.gremlin.server.handler.OpExecutorHandler;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
@@ -100,7 +99,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.core.StringStartsWith.startsWith;
-import static org.junit.Assume.assumeThat;
 import static org.mockito.Mockito.verify;
 
 /**
@@ -1615,7 +1613,10 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.open();
 
         try {
-            final Client client = cluster.connect(name.getMethodName());
+            // this configures the client to behave like OpProcessor for UnifiedChannelizer
+            final Client.SessionSettings settings = Client.SessionSettings.build().
+                    sessionId(name.getMethodName()).maintainStateAfterException(true).create();
+            final Client client = cluster.connect(Client.Settings.build().useSession(settings).create());
 
             for (int index = 0; index < 50; index++) {
                 final CompletableFuture<ResultSet> first = client.submitAsync(
@@ -1647,48 +1648,16 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
                 final CompletableFuture<List<Result>> futureThird = third.get().all();
                 final CompletableFuture<List<Result>> futureFourth = fourth.get().all();
 
-                // there is slightly different assertion logic with UnifiedChannelizer given differences in session
-                // behavior where UnfiedChannelizer sessions won't continue processing in the face of a timeout and
-                // a new session will need to be created
-                if (server.getServerGremlinExecutor().getSettings().channelizer.equals(UnifiedChannelizer.class.getName())) {
-                    // first timesout and the rest get SERVER_ERROR
-                    try {
-                        futureFirst.get();
-                        fail("Should have timed out");
-                    } catch (Exception ex) {
-                        final Throwable root = ExceptionUtils.getRootCause(ex);
-                        assertThat(root, instanceOf(ResponseException.class));
-                        assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) root).getResponseStatusCode());
-                        assertThat(root.getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("250 ms")));
-                    }
-
-                    assertFutureTimeoutUnderUnified(futureSecond);
-                    assertFutureTimeoutUnderUnified(futureThird);
-                    assertFutureTimeoutUnderUnified(futureFourth);
-                } else {
-                    assertFutureTimeout(futureFirst);
-                    assertFutureTimeout(futureSecond);
-                    assertFutureTimeout(futureThird);
-                    assertFutureTimeout(futureFourth);
-                }
+                assertFutureTimeout(futureFirst);
+                assertFutureTimeout(futureSecond);
+                assertFutureTimeout(futureThird);
+                assertFutureTimeout(futureFourth);
             }
         } finally {
             cluster.close();
         }
     }
 
-    private void assertFutureTimeoutUnderUnified(final CompletableFuture<List<Result>> f) {
-        try {
-            f.get();
-            fail("Should have timed out");
-        } catch (Exception ex) {
-            final Throwable root = ExceptionUtils.getRootCause(ex);
-            assertThat(root, instanceOf(ResponseException.class));
-            assertEquals(ResponseStatusCode.SERVER_ERROR, ((ResponseException) root).getResponseStatusCode());
-            assertThat(root.getMessage(), allOf(startsWith("An earlier request"), endsWith("failed prior to this one having a chance to execute")));
-        }
-    }
-
     private void assertFutureTimeout(final CompletableFuture<List<Result>> f) {
         try {
             f.get();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
index 270bed2..a170185 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
@@ -88,6 +88,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
 
 /**
  * Integration tests for server-side settings and processing.
@@ -365,6 +366,9 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
 
     @Test
     public void shouldProduceProperExceptionOnTimeout() throws Exception {
+        // this test will not work quite right on UnifiedChannelizer
+        assumeThat("Must use OpProcessor", isUsingUnifiedChannelizer(), is(false));
+
         final Cluster cluster = TestClientFactory.open();
         final Client client = cluster.connect(name.getMethodName());
 
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
index fe9ff99..6d07558 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
@@ -23,6 +23,7 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.RequestOptions;
 import org.apache.tinkerpop.gremlin.driver.Result;
 import org.apache.tinkerpop.gremlin.driver.ResultSet;
 import org.apache.tinkerpop.gremlin.driver.Tokens;
@@ -54,6 +55,7 @@ import static org.hamcrest.core.IsIterableContaining.hasItem;
 import static org.hamcrest.core.StringStartsWith.startsWith;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
 
 /**
  * @author Stephen Mallette (http://stephen.genoprime.com)
@@ -107,6 +109,7 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
                 settings.sessionLifetimeTimeout = 3000L;
                 break;
             case "shouldCloseSessionOnClientClose":
+            case "shouldCloseSessionOnClientCloseWithStateMaintainedBetweenExceptions":
                 clearNeo4j(settings);
                 break;
             case "shouldEnsureSessionBindingsAreThreadSafe":
@@ -144,11 +147,6 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
         return settings;
     }
 
-    private boolean isUsingUnifiedChannelizer() {
-        return server.getServerGremlinExecutor().
-                getSettings().channelizer.equals(UnifiedChannelizer.class.getName());
-    }
-
     private static void clearNeo4j(Settings settings) {
         deleteDirectory(new File("/tmp/neo4j"));
         settings.graphs.put("graph", "conf/neo4j-empty.properties");
@@ -179,6 +177,7 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
 
         // should get an error because "x" is not defined as this is a new session
         try {
+            clientReconnect.submit("y=100").all().join();
             clientReconnect.submit("x").all().join();
             fail("Should not have been successful as 'x' was only defined in the old session");
         } catch(Exception ex) {
@@ -188,6 +187,12 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
 
         // the commit from client1 should not have gone through so there should be no data present.
         assertEquals(0, clientReconnect.submit("graph.traversal().V().count()").all().join().get(0).getInt());
+
+        // must turn on maintainStateAfterException for unified channelizer
+        if (!isUsingUnifiedChannelizer()) {
+            assertEquals(100, clientReconnect.submit("y").all().join().get(0).getInt());
+        }
+
         clusterReconnect.close();
 
         if (isUsingUnifiedChannelizer()) {
@@ -196,6 +201,49 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
     }
 
     @Test
+    public void shouldCloseSessionOnClientCloseWithStateMaintainedBetweenExceptions() throws Exception {
+        assumeNeo4jIsPresent();
+        assumeThat("Must use UnifiedChannelizer", isUsingUnifiedChannelizer(), is(true));
+
+        final Cluster cluster1 = TestClientFactory.open();
+        final Client client1 = cluster1.connect(name.getMethodName());
+        client1.submit("x = 1").all().join();
+        client1.submit("graph.addVertex()").all().join();
+        client1.close();
+        cluster1.close();
+
+        assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
+
+        // try to reconnect to that session and make sure no state is there
+        final Cluster clusterReconnect = TestClientFactory.open();
+
+        // this configures the client to behave like OpProcessor for UnifiedChannelizer
+        final Client.SessionSettings settings = Client.SessionSettings.build().
+                sessionId(name.getMethodName()).maintainStateAfterException(true).create();
+        final Client clientReconnect = clusterReconnect.connect(Client.Settings.build().useSession(settings).create());
+
+        // should get an error because "x" is not defined as this is a new session
+        try {
+            clientReconnect.submit("y=100").all().join();
+            clientReconnect.submit("x").all().join();
+            fail("Should not have been successful as 'x' was only defined in the old session");
+        } catch(Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root.getMessage(), startsWith("No such property"));
+        }
+
+        // the commit from client1 should not have gone through so there should be no data present.
+        assertEquals(0, clientReconnect.submit("graph.traversal().V().count()").all().join().get(0).getInt());
+
+        // since maintainStateAfterException is enabled the UnifiedChannelizer works like OpProcessor
+        assertEquals(100, clientReconnect.submit("y").all().join().get(0).getInt());
+
+        clusterReconnect.close();
+
+        assertEquals(0, ((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().getActiveSessionCount());
+    }
+
+    @Test
     public void shouldUseGlobalFunctionCache() throws Exception {
         final Cluster cluster = TestClientFactory.open();
         final Client session = cluster.connect(name.getMethodName());
@@ -280,8 +328,11 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
             client.submit("graph.addVertex(); graph.tx().commit()").all().get();
         }
 
+
         // the transaction is managed so a rollback should have executed
         assertEquals(1, client.submit("g.V().count()").all().get().get(0).getInt());
+
+        cluster.close();
     }
 
     @Test
diff --git a/gremlin-server/src/test/resources/log4j-test.properties b/gremlin-server/src/test/resources/log4j-test.properties
index a9aeddd..b50636b 100644
--- a/gremlin-server/src/test/resources/log4j-test.properties
+++ b/gremlin-server/src/test/resources/log4j-test.properties
@@ -20,5 +20,8 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%p] %C - %m%n
 
+log4j.logger.org.apache.tinkerpop.gremlin.server.AbstractChannelizer=ERROR
 log4j.logger.org.apache.tinkerpop.gremlin.server.op.AbstractEvalOpProcessor=ERROR
+log4j.logger.org.apache.tinkerpop.gremlin.server.handler.MultiRexster=ERROR
+log4j.logger.org.apache.tinkerpop.gremlin.server.handler.SingleRexster=ERROR
 log4j.logger.audit.org.apache.tinkerpop.gremlin.server=INFO

[tinkerpop] 03/07: Ignored shouldEventuallySucceedOnSameServerWithDefault

Posted by sp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2245
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit c05c29123b5247fdba9d99e46f40a91990ef2712
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Thu Apr 1 07:06:30 2021 -0400

    Ignored shouldEventuallySucceedOnSameServerWithDefault
    
    This test is technically failing but it had some bad assertions that made it look like it was actually passing. CTR
---
 .../tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java       | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index ce93582..f006cc4 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -56,6 +56,7 @@ import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.hamcrest.core.IsInstanceOf;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
@@ -373,6 +374,7 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
     }
 
     @Test
+    @Ignore("This test had some bad semantics that allowed it to pass even though it was technically failing")
     public void shouldEventuallySucceedOnSameServerWithDefault() throws Exception {
         stopServer();
 
@@ -390,6 +392,8 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
 
             startServer();
 
+            boolean succeedAtLeastOnce = false;
+
             // default reconnect time is 1 second so wait some extra time to be sure it has time to try to bring it
             // back to life. usually this passes on the first attempt, but docker is sometimes slow and we get failures
             // waiting for Gremlin Server to pop back up
@@ -398,12 +402,15 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
                 try {
                     final int result = client.submit("1+1").all().join().get(0).getInt();
                     assertEquals(2, result);
+                    succeedAtLeastOnce = true;
                     break;
                 } catch (Exception ignored) {
                     logger.warn("Attempt {} failed on shouldEventuallySucceedOnSameServerWithDefault", ix);
                 }
             }
 
+            assertThat(succeedAtLeastOnce, is(true));
+
         } finally {
             cluster.close();
         }

[tinkerpop] 01/07: TINKERPOP-2245 Added UnifiedChannelizer

Posted by sp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2245
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 0fa8f84d31b9b47698f6a27db48dbf9ac6a0cd89
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Thu Mar 25 15:30:25 2021 -0400

    TINKERPOP-2245 Added UnifiedChannelizer
    
    The UnifiedChannelizer consolidates gremlin execution threadpools and unifies/streamlines server request processing between sessionless and sessionful requests.
---
 .travis.yml                                        |   4 +
 CHANGELOG.asciidoc                                 |   1 +
 docs/src/upgrade/release-3.5.x.asciidoc            |  11 +
 .../apache/tinkerpop/gremlin/driver/Handler.java   |   1 -
 .../gremlin/server/AbstractChannelizer.java        |  29 +-
 .../apache/tinkerpop/gremlin/server/Context.java   |  51 ++
 .../tinkerpop/gremlin/server/GremlinServer.java    |  13 +-
 .../apache/tinkerpop/gremlin/server/Settings.java  |  32 +
 .../gremlin/server/channel/HttpChannelizer.java    |   4 +-
 .../gremlin/server/channel/UnifiedChannelizer.java |  77 +++
 .../server/channel/WebSocketChannelizer.java       |  12 +-
 .../handler/AbstractAuthenticationHandler.java     |  12 +-
 .../gremlin/server/handler/AbstractRexster.java    | 719 +++++++++++++++++++++
 .../handler/HttpBasicAuthenticationHandler.java    |  11 +-
 .../gremlin/server/handler/MultiRexster.java       | 207 ++++++
 .../tinkerpop/gremlin/server/handler/Rexster.java  |  84 +++
 .../SaslAndHttpBasicAuthenticationHandler.java     |  25 +-
 .../server/handler/SaslAuthenticationHandler.java  |  11 +-
 .../gremlin/server/handler/SingleRexster.java      |  73 +++
 .../gremlin/server/handler/UnifiedHandler.java     | 283 ++++++++
 .../handler/WsAndHttpChannelizerHandler.java       |   6 +-
 .../AbstractGremlinServerIntegrationTest.java      |  31 +-
 .../tinkerpop/gremlin/server/ContextTest.java      |   3 +-
 .../gremlin/server/GremlinDriverIntegrateTest.java |  68 +-
 ...emlinServerAuditLogDeprecatedIntegrateTest.java |   5 +-
 .../server/GremlinServerAuditLogIntegrateTest.java |   4 +-
 .../server/GremlinServerAuthKrb5IntegrateTest.java |   2 +-
 .../server/GremlinServerAuthzIntegrateTest.java    |   3 +
 .../server/GremlinServerHttpIntegrateTest.java     |   4 +-
 .../gremlin/server/GremlinServerIntegrateTest.java |  30 +-
 .../server/GremlinServerSessionIntegrateTest.java  |  73 ++-
 .../channel/HttpChannelizerIntegrateTest.java      |   5 +
 ...t.java => UnifiedChannelizerIntegrateTest.java} |  42 +-
 gremlin-tools/gremlin-benchmark/pom.xml            |   5 -
 hadoop-gremlin/pom.xml                             |   5 -
 35 files changed, 1827 insertions(+), 119 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index a1456e7..ff3a058 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -60,6 +60,10 @@ jobs:
       name: "gremlin server"
     - script:
         - "mvn clean install -q -DskipTests -Dci"
+        - "travis_wait 60 mvn verify -pl :gremlin-server -DskipTests -DskipIntegrationTests=false -DincludeNeo4j -DtestUnified=true"
+      name: "gremlin server - unified"
+    - script:
+        - "mvn clean install -q -DskipTests -Dci"
         - "mvn verify -pl :gremlin-console -DskipTests -DskipIntegrationTests=false"
       name: "gremlin console"
     - script:
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index b15b0ab..041c278 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -30,6 +30,7 @@ This release also includes changes from <<release-3-4-3, 3.4.3>>.
 * Added a fully shaded version of `gremlin-driver`.
 * Exposed websocket connection status in JavaScript driver.
 * Fixed a bug where spark-gremlin was not re-attaching properties when using `dedup()`.
+* Fixed a bug in `WsAndHttpChannelizer` pipeline configuration where failed object aggregation could not write back HTTP responses.
 * Ensured better consistency of the use of `null` as arguments to mutation steps.
 * Added a `ResponseStatusCode` to indicate that a client should retry its request.
 * Added `TemporaryException` interface to indicate that a transaction can be retried.
diff --git a/docs/src/upgrade/release-3.5.x.asciidoc b/docs/src/upgrade/release-3.5.x.asciidoc
index 77a7ce0..3b99d58 100644
--- a/docs/src/upgrade/release-3.5.x.asciidoc
+++ b/docs/src/upgrade/release-3.5.x.asciidoc
@@ -410,6 +410,17 @@ these values are not hashable and will result in an error. By introducing a `Has
 See: link:https://issues.apache.org/jira/browse/TINKERPOP-2395[TINKERPOP-2395],
 link:https://issues.apache.org/jira/browse/TINKERPOP-2407[TINKERPOP-2407]
 
+==== Gremlin Server UnifiedChannelizer
+
+Just some notes for later:
+
+* UnifiedChannelizer technically replaces all existing implementations but is not yet the default
+* Some new settings related to it: maxParameters, sessionLifeTimeout, useGlobalFunctionCacheForSessions, useCommonEngineForSessions
+* Session behavior shifts slightly under this channelizer for async calls, where a failure will mean that the session
+will close, remaining requests in the queue will be ignored and rollback will occur.
+* care should be take with strict transaction management and multi-graph transactions (which aren't real - not a new thing)
+* absolute max lifetime of a session is a new thing
+
 ==== Gremlin Server Audit Logging
 
 The `authentication.enableAuditlog` configuration property is deprecated, but replaced by the `enableAuditLog` property
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index aa15597..11bf8b4 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -222,7 +222,6 @@ final class Handler {
             final ResultQueue queue = pending.get(response.getRequestId());
             if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.PARTIAL_CONTENT) {
                 final Object data = response.getResult().getData();
-                final Map<String,Object> meta = response.getResult().getMeta();
 
                 // this is a "result" from the server which is either the result of a script or a
                 // serialized traversal
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java
index 1e2287ea..ee02bd8 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java
@@ -50,6 +50,7 @@ import javax.net.ssl.TrustManagerFactory;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.Constructor;
 import java.security.KeyStore;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
@@ -98,6 +99,7 @@ public abstract class AbstractChannelizer extends ChannelInitializer<SocketChann
     public static final String PIPELINE_AUTHORIZER = "authorizer";
     public static final String PIPELINE_REQUEST_HANDLER = "request-handler";
     public static final String PIPELINE_HTTP_RESPONSE_ENCODER = "http-response-encoder";
+    public static final String PIPELINE_HTTP_AGGREGATOR = "http-aggregator";
     public static final String PIPELINE_WEBSOCKET_SERVER_COMPRESSION = "web-socket-server-compression-handler";
 
     protected static final String PIPELINE_SSL = "ssl";
@@ -182,10 +184,29 @@ public abstract class AbstractChannelizer extends ChannelInitializer<SocketChann
     protected AbstractAuthenticationHandler createAuthenticationHandler(final Settings settings) {
         try {
             final Class<?> clazz = Class.forName(settings.authentication.authenticationHandler);
-            final Class[] constructorArgs = new Class[2];
-            constructorArgs[0] = Authenticator.class;
-            constructorArgs[1] = Settings.class;
-            return (AbstractAuthenticationHandler) clazz.getDeclaredConstructor(constructorArgs).newInstance(authenticator, settings);
+            AbstractAuthenticationHandler aah;
+            try {
+                // the three arg constructor is the new form as a handler may need the authorizer in some cases
+                final Class<?>[] threeArgForm = new Class[]{Authenticator.class, Authorizer.class, Settings.class};
+                final Constructor<?> twoArgConstructor = clazz.getDeclaredConstructor(threeArgForm);
+                return (AbstractAuthenticationHandler) twoArgConstructor.newInstance(authenticator, authorizer, settings);
+            } catch (Exception threeArgEx) {
+                try {
+                    // the two arg constructor is the "old form" that existed prior to Authorizers. should probably
+                    // deprecate this form
+                    final Class<?>[] twoArgForm = new Class[]{Authenticator.class, Settings.class};
+                    final Constructor<?> twoArgConstructor = clazz.getDeclaredConstructor(twoArgForm);
+
+                    if (authorizer != null) {
+                        logger.warn("There is an authorizer configured but the {} does not have a constructor of ({}, {}, {}) so it cannot be added",
+                                clazz.getName(), Authenticator.class.getSimpleName(), Authorizer.class.getSimpleName(), Settings.class.getSimpleName());
+                    }
+
+                    return (AbstractAuthenticationHandler) twoArgConstructor.newInstance(authenticator, settings);
+                } catch (Exception twoArgEx) {
+                    throw twoArgEx;
+                }
+            }
         } catch (Exception ex) {
             logger.warn(ex.getMessage());
             throw new IllegalStateException(String.format("Could not create/configure AuthenticationHandler %s", settings.authentication.authenticationHandler), ex);
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
index 902a788..fcd2072 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
@@ -18,16 +18,21 @@
  */
 package org.apache.tinkerpop.gremlin.server;
 
+import org.apache.tinkerpop.gremlin.driver.Tokens;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
 import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
 import io.netty.channel.ChannelHandlerContext;
+import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinScriptChecker;
+import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
 import org.apache.tinkerpop.gremlin.server.handler.Frame;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -45,6 +50,11 @@ public class Context {
     private final GremlinExecutor gremlinExecutor;
     private final ScheduledExecutorService scheduledExecutorService;
     private final AtomicBoolean finalResponseWritten = new AtomicBoolean();
+    private final long requestTimeout;
+    private final RequestContentType requestContentType;
+    private final Object gremlinArgument;
+
+    public enum RequestContentType { BYTECODE, SCRIPT, UNKNOWN }
 
     public Context(final RequestMessage requestMessage, final ChannelHandlerContext ctx,
                    final Settings settings, final GraphManager graphManager,
@@ -55,6 +65,23 @@ public class Context {
         this.graphManager = graphManager;
         this.gremlinExecutor = gremlinExecutor;
         this.scheduledExecutorService = scheduledExecutorService;
+
+        // order of calls matter as one depends on the next
+        this.gremlinArgument = requestMessage.getArgs().get(Tokens.ARGS_GREMLIN);
+        this.requestContentType = determineRequestContents();
+        this.requestTimeout = determineTimeout();
+    }
+
+    public long getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    public RequestContentType getRequestContentType() {
+        return requestContentType;
+    }
+
+    public Object getGremlinArgument() {
+        return gremlinArgument;
     }
 
     public ScheduledExecutorService getScheduledExecutorService() {
@@ -133,4 +160,28 @@ public class Context {
         }
 
     }
+
+    private RequestContentType determineRequestContents() {
+        if (gremlinArgument instanceof Bytecode)
+            return RequestContentType.BYTECODE;
+        else if (gremlinArgument instanceof String)
+            return RequestContentType.SCRIPT;
+        else
+            return RequestContentType.UNKNOWN;
+    }
+
+    private long determineTimeout() {
+        // timeout override - handle both deprecated and newly named configuration. earlier logic should prevent
+        // both configurations from being submitted at the same time
+        final Map<String, Object> args = requestMessage.getArgs();
+        final long seto = args.containsKey(Tokens.ARGS_EVAL_TIMEOUT) ?
+                ((Number) args.get(Tokens.ARGS_EVAL_TIMEOUT)).longValue() : settings.getEvaluationTimeout();
+
+        // override the timeout if the lifecycle has a value assigned. if the script contains with(timeout)
+        // options then allow that value to override what's provided on the lifecycle
+        final Optional<Long> timeoutDefinedInScript = requestContentType == RequestContentType.SCRIPT ?
+                GremlinScriptChecker.parse(gremlinArgument.toString()).getTimeout() : Optional.empty();
+
+        return timeoutDefinedInScript.orElse(seto);
+    }
 }
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
index 2a1adf7..b322aa7 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
@@ -78,6 +78,7 @@ public class GremlinServer {
     private final ExecutorService gremlinExecutorService;
     private final ServerGremlinExecutor serverGremlinExecutor;
     private final boolean isEpollEnabled;
+    private Channelizer channelizer;
 
     /**
      * Construct a Gremlin Server instance from {@link Settings}.
@@ -159,7 +160,7 @@ public class GremlinServer {
                 }
             });
 
-            final Channelizer channelizer = createChannelizer(settings);
+            channelizer = createChannelizer(settings);
             channelizer.init(serverGremlinExecutor);
             b.group(bossGroup, workerGroup)
                     .childHandler(channelizer);
@@ -284,8 +285,10 @@ public class GremlinServer {
             }
 
             try {
-                if (gremlinExecutorService != null)
-                    gremlinExecutorService.awaitTermination(30000, TimeUnit.MILLISECONDS);
+                if (gremlinExecutorService != null) {
+                    if (!gremlinExecutorService.awaitTermination(30000, TimeUnit.MILLISECONDS))
+                        logger.warn("Gremlin thread pool did not fully terminate - continuing with shutdown process");
+                }
             } catch (InterruptedException ie) {
                 logger.warn("Timeout waiting for Gremlin thread pool to shutdown - continuing with shutdown process.");
             }
@@ -330,6 +333,10 @@ public class GremlinServer {
         return serverGremlinExecutor;
     }
 
+    public Channelizer getChannelizer() {
+        return channelizer;
+    }
+
     public static void main(final String[] args) throws Exception {
         // add to vm options: -Dlog4j.configuration=file:conf/log4j.properties
         printHeader();
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
index 3a535ab..3f84c19 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
 import org.apache.tinkerpop.gremlin.server.auth.AllowAllAuthenticator;
 import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
 import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
 import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
 import org.apache.tinkerpop.gremlin.server.handler.AbstractAuthenticationHandler;
 import org.apache.tinkerpop.gremlin.server.util.DefaultGraphManager;
@@ -197,6 +198,37 @@ public class Settings {
     public String graphManager = DefaultGraphManager.class.getName();
 
     /**
+     * Maximum number of parameters that can be passed on a request. Larger numbers may impact performance for scripts.
+     * The default is 16 and this setting only applies to the {@link UnifiedChannelizer}.
+     */
+    public int maxParameters = 16;
+
+    /**
+     * The time in milliseconds that a {@link UnifiedChannelizer} session can exist. This value cannot be extended
+     * beyond this value irrespective of the number of requests and their individual timeouts. Requests must complete
+     * within this time frame. The default is 10 minutes.
+     */
+    public long sessionLifetimeTimeout = 600000;
+
+    /**
+     * Enable the global function cache for sessions when using the {@link UnifiedChannelizer}. This setting is only
+     * relevant when {@link #useGlobalFunctionCacheForSessions} is {@code false}. When {@link true} it means that
+     * functions created in one request to a session remain available on the next request to that session.
+     */
+    public boolean useGlobalFunctionCacheForSessions = true;
+
+    /**
+     * When {@code true} and using the {@link UnifiedChannelizer} the same engine that will be used to server
+     * sessionless requests will also be use to serve sessions. The default value of {@code true} is recommended as
+     * it reduces the amount of object creation required for each session and should generally lead to better
+     * performance especially if the expectation is that there will be many sessions being created and destroyed
+     * rapidly. Setting this value to {@code false} is mostly present to support specific use cases that may require
+     * each session having its own engine or to match previous functionality provided by the older channelizers
+     * produced prior to 3.5.0.
+     */
+    public boolean useCommonEngineForSessions = true;
+
+    /**
      * Configured metrics for Gremlin Server.
      */
     public ServerMetrics metrics = null;
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
index 7b3ad99..be2a7af 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
@@ -62,7 +62,9 @@ public class HttpChannelizer extends AbstractChannelizer {
         if (logger.isDebugEnabled())
             pipeline.addLast(new LoggingHandler("http-io", LogLevel.DEBUG));
 
-        pipeline.addLast(new HttpObjectAggregator(settings.maxContentLength));
+        final HttpObjectAggregator aggregator = new HttpObjectAggregator(settings.maxContentLength);
+        aggregator.setMaxCumulationBufferComponents(settings.maxAccumulationBufferComponents);
+        pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, aggregator);
 
         if (authenticator != null) {
             // Cannot add the same handler instance multiple times unless
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizer.java
new file mode 100644
index 0000000..522baa2
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.channel;
+
+import io.netty.channel.ChannelPipeline;
+import org.apache.tinkerpop.gremlin.server.AbstractChannelizer;
+import org.apache.tinkerpop.gremlin.server.Channelizer;
+import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler;
+import org.apache.tinkerpop.gremlin.server.handler.UnifiedHandler;
+import org.apache.tinkerpop.gremlin.server.handler.WsAndHttpChannelizerHandler;
+import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
+
+/**
+ * A {@link Channelizer} that supports websocket and HTTP requests and does so with the most streamlined processing
+ * model for Gremlin Server introduced with 3.5.0.
+ */
+public class UnifiedChannelizer extends AbstractChannelizer {
+
+    private WsAndHttpChannelizerHandler handler;
+    private UnifiedHandler unifiedHandler;
+    protected static final String PIPELINE_UNIFIED = "unified";
+
+    @Override
+    public void init(final ServerGremlinExecutor serverGremlinExecutor) {
+        super.init(serverGremlinExecutor);
+        handler = new WsAndHttpChannelizerHandler();
+        handler.init(serverGremlinExecutor, new HttpGremlinEndpointHandler(serializers, gremlinExecutor, graphManager, settings));
+
+        // these handlers don't share any state and can thus be initialized once per pipeline
+        unifiedHandler = new UnifiedHandler(settings, graphManager, gremlinExecutor, scheduledExecutorService, this);
+    }
+
+    @Override
+    public void configure(final ChannelPipeline pipeline) {
+        handler.configure(pipeline);
+        pipeline.addAfter(PIPELINE_HTTP_REQUEST_DECODER, "WsAndHttpChannelizerHandler", handler);
+    }
+
+    @Override
+    public void finalize(final ChannelPipeline pipeline) {
+        super.finalize(pipeline);
+        pipeline.remove(PIPELINE_OP_SELECTOR);
+        pipeline.remove(PIPELINE_OP_EXECUTOR);
+
+        pipeline.addLast(PIPELINE_UNIFIED, unifiedHandler);
+    }
+
+    public UnifiedHandler getUnifiedHandler() {
+        return unifiedHandler;
+    }
+
+    @Override
+    public boolean supportsIdleMonitor() {
+        return true;
+    }
+
+    @Override
+    public Object createIdleDetectionMessage() {
+        return handler.getWsChannelizer().createIdleDetectionMessage();
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
index c4ff402..a1864fd 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
@@ -81,8 +81,11 @@ public class WebSocketChannelizer extends AbstractChannelizer {
 
     @Override
     public void configure(final ChannelPipeline pipeline) {
+
         if (logger.isDebugEnabled())
-            pipeline.addLast(new LoggingHandler("log-io", LogLevel.DEBUG));
+            pipeline.addLast(new LoggingHandler("log-encoder-aggregator", LogLevel.DEBUG));
+
+        pipeline.addLast(PIPELINE_HTTP_RESPONSE_ENCODER, new HttpResponseEncoder());
 
         logger.debug("HttpRequestDecoder settings - maxInitialLineLength={}, maxHeaderSize={}, maxChunkSize={}",
                 settings.maxInitialLineLength, settings.maxHeaderSize, settings.maxChunkSize);
@@ -95,12 +98,7 @@ public class WebSocketChannelizer extends AbstractChannelizer {
                 settings.maxContentLength, settings.maxAccumulationBufferComponents);
         final HttpObjectAggregator aggregator = new HttpObjectAggregator(settings.maxContentLength);
         aggregator.setMaxCumulationBufferComponents(settings.maxAccumulationBufferComponents);
-        pipeline.addLast("aggregator", aggregator);
-
-        if (logger.isDebugEnabled())
-            pipeline.addLast(new LoggingHandler("log-aggregator-encoder", LogLevel.DEBUG));
-
-        pipeline.addLast(PIPELINE_HTTP_RESPONSE_ENCODER, new HttpResponseEncoder());
+        pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, aggregator);
         // Add compression extension for WebSocket defined in https://tools.ietf.org/html/rfc7692
         pipeline.addLast(PIPELINE_WEBSOCKET_SERVER_COMPRESSION, new WebSocketServerCompressionHandler());
 
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractAuthenticationHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractAuthenticationHandler.java
index 026ad59..074e4ab 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractAuthenticationHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractAuthenticationHandler.java
@@ -21,15 +21,25 @@ package org.apache.tinkerpop.gremlin.server.handler;
 import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
 
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
 
 /**
  * Provides an abstraction point to allow for http auth schemes beyond basic auth.
  */
 public abstract class AbstractAuthenticationHandler extends ChannelInboundHandlerAdapter {
     protected final Authenticator authenticator;
+    protected final Authorizer authorizer;
 
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #AbstractAuthenticationHandler(Authenticator, Authorizer)}.
+     */
+    @Deprecated
     public AbstractAuthenticationHandler(final Authenticator authenticator) {
-        this.authenticator = authenticator;
+        this(authenticator, null);
     }
 
+    public AbstractAuthenticationHandler(final Authenticator authenticator, final Authorizer authorizer) {
+        this.authenticator = authenticator;
+        this.authorizer = authorizer;
+    }
 }
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java
new file mode 100644
index 0000000..7f7c51c
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java
@@ -0,0 +1,719 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import groovy.lang.GroovyRuntimeException;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
+import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer;
+import org.apache.tinkerpop.gremlin.groovy.jsr223.TimedInterruptTimeoutException;
+import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptEngine;
+import org.apache.tinkerpop.gremlin.jsr223.JavaTranslator;
+import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
+import org.apache.tinkerpop.gremlin.process.traversal.util.BytecodeHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
+import org.apache.tinkerpop.gremlin.server.Context;
+import org.apache.tinkerpop.gremlin.server.GraphManager;
+import org.apache.tinkerpop.gremlin.server.GremlinServer;
+import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
+import org.apache.tinkerpop.gremlin.server.util.ExceptionHelper;
+import org.apache.tinkerpop.gremlin.server.util.TraverserIterator;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.apache.tinkerpop.gremlin.structure.util.TemporaryException;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.codehaus.groovy.control.MultipleCompilationErrorsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.script.Bindings;
+import javax.script.ScriptException;
+import javax.script.SimpleBindings;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+/**
+ * A base implementation of {@link Rexster} which offers some common functionality that matches typical Gremlin Server
+ * request response expectations for script, bytecode and graph operations. The class is designed to be extended but
+ * take care in understanding the way that different methods are called as they do depend on one another a bit. It
+ * maybe best to examine the source code to determine how best to use this class or to extend from the higher order
+ * classes of {@link SingleRexster} or {@link MultiRexster}.
+ */
+public abstract class AbstractRexster implements Rexster, AutoCloseable {
+    private static final Logger logger = LoggerFactory.getLogger(AbstractRexster.class);
+    private static final Logger auditLogger = LoggerFactory.getLogger(GremlinServer.AUDIT_LOGGER_NAME);
+
+    private final Channel initialChannel;
+    private final boolean transactionManaged;
+    private final String sessionId;
+    private final AtomicReference<ScheduledFuture<?>> sessionCancelFuture = new AtomicReference<>();
+    private final AtomicReference<Future<?>> sessionFuture = new AtomicReference<>();
+    private long actualTimeoutLength = 0;
+    private boolean actualTimeoutCausedBySession = false;
+    protected final GraphManager graphManager;
+    protected final ConcurrentMap<String, Rexster> sessions;
+    protected final Set<String> aliasesUsedByRexster = new HashSet<>();
+
+    AbstractRexster(final Context gremlinContext, final String sessionId,
+                    final boolean transactionManaged, final ConcurrentMap<String, Rexster> sessions) {
+        this.transactionManaged = transactionManaged;
+        this.sessionId = sessionId;
+        this.initialChannel = gremlinContext.getChannelHandlerContext().channel();
+
+        // close Rexster if the channel closes to cleanup and close transactions
+        this.initialChannel.closeFuture().addListener(f -> {
+            // cancel session worker or it will keep waiting for items to appear in the session queue
+            final Future<?> sf = sessionFuture.get();
+            if (sf != null && !sf.isDone()) {
+                sf.cancel(true);
+            }
+            close();
+        });
+        this.sessions = sessions;
+        this.graphManager = gremlinContext.getGraphManager();
+    }
+
+    public boolean isTransactionManaged() {
+        return transactionManaged;
+    }
+
+    public String getSessionId() {
+        return sessionId;
+    }
+
+    public boolean isBoundTo(final Channel channel) {
+        return channel == initialChannel;
+    }
+
+    public long getActualTimeoutLength() {
+        return actualTimeoutLength;
+    }
+
+    public boolean isActualTimeoutCausedBySession() {
+        return actualTimeoutCausedBySession;
+    }
+
+    public GremlinScriptEngine getScriptEngine(final Context context, final String language) {
+        return context.getGremlinExecutor().getScriptEngineManager().getEngineByName(language);
+    }
+
+    @Override
+    public void setSessionCancelFuture(final ScheduledFuture<?> f) {
+        if (!sessionCancelFuture.compareAndSet(null, f))
+            throw new IllegalStateException("Session cancellation future is already set");
+    }
+
+    @Override
+    public void setSessionFuture(final Future<?> f) {
+        if (!sessionFuture.compareAndSet(null, f))
+            throw new IllegalStateException("Session future is already set");
+    }
+
+    @Override
+    public synchronized void triggerTimeout(final long timeout, final boolean causedBySession) {
+        // triggering timeout triggers the stop of the Rexster Runnable which will end in close()
+        // for final cleanup
+        final Future<?> f = sessionFuture.get();
+        if (f != null && !f.isDone()) {
+            actualTimeoutCausedBySession = causedBySession;
+            actualTimeoutLength = timeout;
+            sessionFuture.get().cancel(true);
+        }
+    }
+
+    protected void process(final Context gremlinContext) throws RexsterException {
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+        final Map<String, Object> args = msg.getArgs();
+        final Object gremlinToExecute = args.get(Tokens.ARGS_GREMLIN);
+
+        // for strict transactions track the aliases used so that we can commit them and only them on close()
+        if (gremlinContext.getSettings().strictTransactionManagement)
+            msg.optionalArgs(Tokens.ARGS_ALIASES).ifPresent(m -> aliasesUsedByRexster.addAll(((Map<String,String>) m).values()));
+
+        try {
+            // itty is optional as Bytecode could be a "graph operation" rather than a Traversal. graph operations
+            // don't need to be iterated and handle their own lifecycle
+            final Optional<Iterator<?>> itty = gremlinToExecute instanceof Bytecode ?
+                    fromBytecode(gremlinContext, (Bytecode) gremlinToExecute) :
+                    Optional.of(fromScript(gremlinContext, (String) gremlinToExecute));
+
+            processAuditLog(gremlinContext.getSettings(), gremlinContext.getChannelHandlerContext(), gremlinToExecute);
+
+            if (itty.isPresent())
+                handleIterator(gremlinContext, itty.get());
+        } catch (Exception ex) {
+            handleException(gremlinContext, ex);
+        }
+    }
+
+    protected void handleException(final Context gremlinContext, final Throwable t) throws RexsterException {
+        if (t instanceof RexsterException) throw (RexsterException) t;
+
+        final Optional<Throwable> possibleTemporaryException = determineIfTemporaryException(t);
+        if (possibleTemporaryException.isPresent()) {
+            final Throwable temporaryException = possibleTemporaryException.get();
+            throw new RexsterException(temporaryException.getMessage(), t,
+                    ResponseMessage.build(gremlinContext.getRequestMessage())
+                            .code(ResponseStatusCode.SERVER_ERROR_TEMPORARY)
+                            .statusMessage(temporaryException.getMessage())
+                            .statusAttributeException(temporaryException).create());
+        }
+
+        final Throwable root = ExceptionUtils.getRootCause(t);
+
+        if (root instanceof TimedInterruptTimeoutException) {
+            // occurs when the TimedInterruptCustomizerProvider is in play
+            final String msg = String.format("A timeout occurred within the script during evaluation of [%s] - consider increasing the limit given to TimedInterruptCustomizerProvider",
+                    gremlinContext.getRequestMessage().getRequestId());
+            throw new RexsterException(msg, root, ResponseMessage.build(gremlinContext.getRequestMessage())
+                    .code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+                    .statusMessage("Timeout during script evaluation triggered by TimedInterruptCustomizerProvider")
+                    .create());
+        }
+
+        if (root instanceof TimeoutException) {
+            final String errorMessage = String.format("Script evaluation exceeded the configured threshold for request [%s]",
+                    gremlinContext.getRequestMessage().getRequestId());
+            throw new RexsterException(errorMessage, root, ResponseMessage.build(gremlinContext.getRequestMessage())
+                    .code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+                    .statusMessage(t.getMessage())
+                    .create());
+        }
+
+        if (root instanceof InterruptedException ||
+                root instanceof TraversalInterruptedException ||
+                root instanceof InterruptedIOException) {
+            final String msg = actualTimeoutCausedBySession ?
+                    String.format("Session closed - %s - sessionLifetimeTimeout of %s ms exceeded", sessionId, actualTimeoutLength) :
+                    String.format("Evaluation exceeded timeout threshold of %s ms", actualTimeoutLength);
+            throw new RexsterException(msg, root, ResponseMessage.build(gremlinContext.getRequestMessage())
+                    .code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+                    .statusMessage(msg).create());
+        }
+
+        if (root instanceof MultipleCompilationErrorsException && root.getMessage().contains("Method too large") &&
+                ((MultipleCompilationErrorsException) root).getErrorCollector().getErrorCount() == 1) {
+            final String errorMessage = String.format("The Gremlin statement that was submitted exceeds the maximum compilation size allowed by the JVM, please split it into multiple smaller statements - %s", trimMessage(gremlinContext.getRequestMessage()));
+            logger.warn(errorMessage);
+            throw new RexsterException(errorMessage, root, ResponseMessage.build(gremlinContext.getRequestMessage())
+                    .code(ResponseStatusCode.SERVER_ERROR_EVALUATION)
+                    .statusMessage(errorMessage)
+                    .statusAttributeException(root).create());
+        }
+
+        // GroovyRuntimeException will hit a pretty wide range of eval type errors, like MissingPropertyException,
+        // CompilationFailedException, MissingMethodException, etc. If more specific handling is required then
+        // try to catch it earlier above.
+        if (root instanceof GroovyRuntimeException ||
+                root instanceof VerificationException ||
+                root instanceof ScriptException) {
+            throw new RexsterException(root.getMessage(), root, ResponseMessage.build(gremlinContext.getRequestMessage())
+                    .code(ResponseStatusCode.SERVER_ERROR_EVALUATION)
+                    .statusMessage(root.getMessage())
+                    .statusAttributeException(root).create());
+        }
+
+        throw new RexsterException(root.getClass().getSimpleName() + ": " + root.getMessage(), root,
+                ResponseMessage.build(gremlinContext.getRequestMessage())
+                        .code(ResponseStatusCode.SERVER_ERROR)
+                        .statusAttributeException(root)
+                        .statusMessage(root.getMessage()).create());
+    }
+
+    /**
+     * Used to decrease the size of a Gremlin script that triggered a "method too large" exception so that it
+     * doesn't log a massive text string nor return a large error message.
+     */
+    private RequestMessage trimMessage(final RequestMessage msg) {
+        final RequestMessage trimmedMsg = RequestMessage.from(msg).create();
+        if (trimmedMsg.getArgs().containsKey(Tokens.ARGS_GREMLIN))
+            trimmedMsg.getArgs().put(Tokens.ARGS_GREMLIN, trimmedMsg.getArgs().get(Tokens.ARGS_GREMLIN).toString().substring(0, 1021) + "...");
+
+        return trimmedMsg;
+    }
+
+    /**
+     * Check if any exception in the chain is TemporaryException then we should respond with the right error code so
+     * that the client knows to retry.
+     */
+    protected Optional<Throwable> determineIfTemporaryException(final Throwable ex) {
+        return Stream.of(ExceptionUtils.getThrowables(ex)).
+                filter(i -> i instanceof TemporaryException).findFirst();
+    }
+
+    @Override
+    public synchronized void close() {
+        // already closing/closed
+        if (!sessions.containsKey(sessionId)) return;
+
+        sessions.remove(sessionId);
+
+        if (sessionCancelFuture.get() != null) {
+            final ScheduledFuture<?> f = sessionCancelFuture.get();
+            if (!f.isDone()) f.cancel(true);
+        }
+    }
+
+    protected Iterator<?> fromScript(final Context gremlinContext, final String script) throws Exception {
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+        final Map<String, Object> args = msg.getArgs();
+        final String language = args.containsKey(Tokens.ARGS_LANGUAGE) ? (String) args.get(Tokens.ARGS_LANGUAGE) : "gremlin-groovy";
+        return IteratorUtils.asIterator(getScriptEngine(gremlinContext, language).eval(
+                script, mergeBindingsFromRequest(gremlinContext, getWorkerBindings())));
+    }
+
+    protected Optional<Iterator<?>> fromBytecode(final Context gremlinContext, final Bytecode bytecode) throws Exception {
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+
+        final Traversal.Admin<?, ?> traversal;
+        final Map<String, String> aliases = (Map<String, String>) msg.optionalArgs(Tokens.ARGS_ALIASES).get();
+        final GraphManager graphManager = gremlinContext.getGraphManager();
+        final String traversalSourceName = aliases.entrySet().iterator().next().getValue();
+        final TraversalSource g = graphManager.getTraversalSource(traversalSourceName);
+
+        // handle bytecode based graph operations like commit/rollback commands
+        if (BytecodeHelper.isGraphOperation(bytecode)) {
+            handleGraphOperation(gremlinContext, bytecode, g.getGraph());
+            return Optional.empty();
+        } else {
+
+            final Optional<String> lambdaLanguage = BytecodeHelper.getLambdaLanguage(bytecode);
+            if (!lambdaLanguage.isPresent())
+                traversal = JavaTranslator.of(g).translate(bytecode);
+            else {
+                final SimpleBindings bindings = new SimpleBindings();
+                bindings.put(traversalSourceName, g);
+                traversal = gremlinContext.getGremlinExecutor().getScriptEngineManager().getEngineByName(lambdaLanguage.get()).eval(bytecode, bindings, traversalSourceName);
+            }
+
+            // compile the traversal - without it getEndStep() has nothing in it
+            traversal.applyStrategies();
+
+            return Optional.of(new TraverserIterator(traversal));
+        }
+    }
+
+    protected Bindings getWorkerBindings() throws RexsterException {
+        return new SimpleBindings(graphManager.getAsBindings());
+    }
+
+    protected Bindings mergeBindingsFromRequest(final Context gremlinContext, final Bindings bindings) throws RexsterException {
+        // alias any global bindings to a different variable.
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+        if (msg.getArgs().containsKey(Tokens.ARGS_ALIASES)) {
+            final Map<String, String> aliases = (Map<String, String>) msg.getArgs().get(Tokens.ARGS_ALIASES);
+            for (Map.Entry<String,String> aliasKv : aliases.entrySet()) {
+                boolean found = false;
+
+                // first check if the alias refers to a Graph instance
+                final Graph graph = gremlinContext.getGraphManager().getGraph(aliasKv.getValue());
+                if (null != graph) {
+                    bindings.put(aliasKv.getKey(), graph);
+                    found = true;
+                }
+
+                // if the alias wasn't found as a Graph then perhaps it is a TraversalSource - it needs to be
+                // something
+                if (!found) {
+                    final TraversalSource ts = gremlinContext.getGraphManager().getTraversalSource(aliasKv.getValue());
+                    if (null != ts) {
+                        bindings.put(aliasKv.getKey(), ts);
+                        found = true;
+                    }
+                }
+
+                // this validation is important to calls to GraphManager.commit() and rollback() as they both
+                // expect that the aliases supplied are valid
+                if (!found) {
+                    final String error = String.format("Could not alias [%s] to [%s] as [%s] not in the Graph or TraversalSource global bindings",
+                            aliasKv.getKey(), aliasKv.getValue(), aliasKv.getValue());
+                    throw new RexsterException(error, ResponseMessage.build(msg)
+                            .code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(error).create());
+                }
+            }
+        } else {
+            // there's no bindings so determine if that's ok with Gremlin Server
+            if (gremlinContext.getSettings().strictTransactionManagement) {
+                final String error = "Gremlin Server is configured with strictTransactionManagement as 'true' - the 'aliases' arguments must be provided";
+                throw new RexsterException(error, ResponseMessage.build(msg)
+                        .code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(error).create());
+            }
+        }
+
+        // add any bindings to override any other supplied
+        Optional.ofNullable((Map<String, Object>) msg.getArgs().get(Tokens.ARGS_BINDINGS)).ifPresent(bindings::putAll);
+        return bindings;
+    }
+
+    /**
+     * Provides a generic way of iterating a result set back to the client.
+     *
+     * @param gremlinContext The Gremlin Server {@link Context} object containing settings, request message, etc.
+     * @param itty The result to iterator
+     */
+    protected void handleIterator(final Context gremlinContext, final Iterator<?> itty) throws InterruptedException {
+        final ChannelHandlerContext nettyContext = gremlinContext.getChannelHandlerContext();
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+        final Settings settings = gremlinContext.getSettings();
+        boolean warnOnce = false;
+
+        // sessionless requests are always transaction managed, but in-session requests are configurable.
+        final boolean managedTransactionsForRequest = transactionManaged ?
+                true : (Boolean) msg.getArgs().getOrDefault(Tokens.ARGS_MANAGE_TRANSACTION, false);
+
+        // we have an empty iterator - happens on stuff like: g.V().iterate()
+        if (!itty.hasNext()) {
+            final Map<String, Object> attributes = generateStatusAttributes(gremlinContext,ResponseStatusCode.NO_CONTENT, itty);
+            // as there is nothing left to iterate if we are transaction managed then we should execute a
+            // commit here before we send back a NO_CONTENT which implies success
+            if (managedTransactionsForRequest)
+                closeTransaction(gremlinContext, Transaction.Status.COMMIT);
+
+            gremlinContext.writeAndFlush(ResponseMessage.build(msg)
+                    .code(ResponseStatusCode.NO_CONTENT)
+                    .statusAttributes(attributes)
+                    .create());
+            return;
+        }
+
+        // the batch size can be overridden by the request
+        final int resultIterationBatchSize = (Integer) msg.optionalArgs(Tokens.ARGS_BATCH_SIZE)
+                .orElse(settings.resultIterationBatchSize);
+        List<Object> aggregate = new ArrayList<>(resultIterationBatchSize);
+
+        // use an external control to manage the loop as opposed to just checking hasNext() in the while.  this
+        // prevent situations where auto transactions create a new transaction after calls to commit() withing
+        // the loop on calls to hasNext().
+        boolean hasMore = itty.hasNext();
+
+        while (hasMore) {
+            if (Thread.interrupted()) throw new InterruptedException();
+
+            // check if an implementation needs to force flush the aggregated results before the iteration batch
+            // size is reached.
+            // todo: what implementation does this?! can we kill it going forward - seems always false
+            // final boolean forceFlush = isForceFlushed(nettyContext, msg, itty);
+            final boolean forceFlush = false;
+
+            // have to check the aggregate size because it is possible that the channel is not writeable (below)
+            // so iterating next() if the message is not written and flushed would bump the aggregate size beyond
+            // the expected resultIterationBatchSize.  Total serialization time for the response remains in
+            // effect so if the client is "slow" it may simply timeout.
+            //
+            // there is a need to check hasNext() on the iterator because if the channel is not writeable the
+            // previous pass through the while loop will have next()'d the iterator and if it is "done" then a
+            // NoSuchElementException will raise its head. also need a check to ensure that this iteration doesn't
+            // require a forced flush which can be forced by sub-classes.
+            //
+            // this could be placed inside the isWriteable() portion of the if-then below but it seems better to
+            // allow iteration to continue into a batch if that is possible rather than just doing nothing at all
+            // while waiting for the client to catch up
+            if (aggregate.size() < resultIterationBatchSize && itty.hasNext() && !forceFlush) aggregate.add(itty.next());
+
+            // Don't keep executor busy if client has already given up; there is no way to catch up if the channel is
+            // not active, and hence we should break the loop.
+            if (!nettyContext.channel().isActive()) {
+                if (managedTransactionsForRequest) {
+                    closeTransaction(gremlinContext, Transaction.Status.ROLLBACK);
+                }
+                break;
+            }
+
+            // send back a page of results if batch size is met or if it's the end of the results being iterated.
+            // also check writeability of the channel to prevent OOME for slow clients.
+            //
+            // clients might decide to close the Netty channel to the server with a CloseWebsocketFrame after errors
+            // like CorruptedFrameException. On the server, although the channel gets closed, there might be some
+            // executor threads waiting for watermark to clear which will not clear in these cases since client has
+            // already given up on these requests. This leads to these executors waiting for the client to consume
+            // results till the timeout. checking for isActive() should help prevent that.
+            if (nettyContext.channel().isActive() && nettyContext.channel().isWritable()) {
+                if (forceFlush || aggregate.size() == resultIterationBatchSize || !itty.hasNext()) {
+                    final ResponseStatusCode code = itty.hasNext() ? ResponseStatusCode.PARTIAL_CONTENT : ResponseStatusCode.SUCCESS;
+                    Frame frame = null;
+                    try {
+                        frame = makeFrame(gremlinContext, aggregate, code, itty);
+                    } catch (Exception ex) {
+                        // a frame may use a Bytebuf which is a countable release - if it does not get written
+                        // downstream it needs to be released here
+                        if (frame != null) frame.tryRelease();
+
+                        // exception is handled in makeFrame() - serialization error gets written back to driver
+                        // at that point
+                        if (managedTransactionsForRequest)
+                            closeTransaction(gremlinContext, Transaction.Status.ROLLBACK);
+                        break;
+                    }
+
+                    // track whether there is anything left in the iterator because it needs to be accessed after
+                    // the transaction could be closed - in that case a call to hasNext() could open a new transaction
+                    // unintentionally
+                    final boolean moreInIterator = itty.hasNext();
+
+                    try {
+                        // only need to reset the aggregation list if there's more stuff to write
+                        if (moreInIterator)
+                            aggregate = new ArrayList<>(resultIterationBatchSize);
+                        else {
+                            // iteration and serialization are both complete which means this finished successfully. note that
+                            // errors internal to script eval or timeout will rollback given GremlinServer's global configurations.
+                            // local errors will get rolledback below because the exceptions aren't thrown in those cases to be
+                            // caught by the GremlinExecutor for global rollback logic. this only needs to be committed if
+                            // there are no more items to iterate and serialization is complete
+                            if (managedTransactionsForRequest)
+                                closeTransaction(gremlinContext, Transaction.Status.COMMIT);
+
+                            // exit the result iteration loop as there are no more results left.  using this external control
+                            // because of the above commit.  some graphs may open a new transaction on the call to
+                            // hasNext()
+                            hasMore = false;
+                        }
+                    } catch (Exception ex) {
+                        // a frame may use a Bytebuf which is a countable release - if it does not get written
+                        // downstream it needs to be released here
+                        if (frame != null) frame.tryRelease();
+                        throw ex;
+                    }
+
+                    if (!moreInIterator) iterateComplete(gremlinContext, itty);
+
+                    // the flush is called after the commit has potentially occurred.  in this way, if a commit was
+                    // required then it will be 100% complete before the client receives it. the "frame" at this point
+                    // should have completely detached objects from the transaction (i.e. serialization has occurred)
+                    // so a new one should not be opened on the flush down the netty pipeline
+                    gremlinContext.writeAndFlush(code, frame);
+                }
+            } else {
+                // don't keep triggering this warning over and over again for the same request
+                if (!warnOnce) {
+                    logger.warn("Pausing response writing as writeBufferHighWaterMark exceeded on {} - writing will continue once client has caught up", msg);
+                    warnOnce = true;
+                }
+
+                // since the client is lagging we can hold here for a period of time for the client to catch up.
+                // this isn't blocking the IO thread - just a worker.
+                TimeUnit.MILLISECONDS.sleep(10);
+            }
+        }
+    }
+
+    /**
+     * If {@link Bytecode} is detected to contain a "graph operation" then it gets processed by this method.
+     */
+    protected void handleGraphOperation(final Context gremlinContext, final Bytecode bytecode, final Graph graph) throws Exception {
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+        if (graph.features().graph().supportsTransactions()) {
+            if (bytecode.equals(Bytecode.TX_COMMIT) || bytecode.equals(Bytecode.TX_ROLLBACK)) {
+                final boolean commit = bytecode.equals(Bytecode.TX_COMMIT);
+                closeTransaction(gremlinContext, commit ? Transaction.Status.COMMIT : Transaction.Status.ROLLBACK);
+
+                // write back a no-op for success
+                final Map<String, Object> attributes = generateStatusAttributes(gremlinContext,
+                        ResponseStatusCode.NO_CONTENT, Collections.emptyIterator());
+                gremlinContext.writeAndFlush(ResponseMessage.build(msg)
+                            .code(ResponseStatusCode.NO_CONTENT)
+                            .statusAttributes(attributes)
+                            .create());
+            } else {
+                throw new IllegalStateException(String.format(
+                        "Bytecode in request is not a recognized graph operation: %s", bytecode.toString()));
+            }
+        }
+    }
+
+    /**
+     * Called when iteration within {@link #handleIterator(Context, Iterator)} is on its final pass and the final
+     * frame is about to be sent back to the client. This method only gets called on successful iteration of the
+     * entire result.
+     */
+    protected void iterateComplete(final Context gremlinContext, final Iterator<?> itty) {
+        // do nothing by default
+    }
+
+    /**
+     * Generates response status meta-data to put on a {@link ResponseMessage}.
+     *
+     * @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in
+     *             this method
+     */
+    protected Map<String, Object> generateStatusAttributes(final Context gremlinContext,
+                                                           final ResponseStatusCode code, final Iterator<?> itty) {
+        // only return server metadata on the last message
+        if (itty.hasNext()) return Collections.emptyMap();
+
+        final Map<String, Object> metaData = new HashMap<>();
+        metaData.put(Tokens.ARGS_HOST, gremlinContext.getChannelHandlerContext().channel().remoteAddress().toString());
+
+        return metaData;
+    }
+
+    /**
+     * Generates response result meta-data to put on a {@link ResponseMessage}.
+     *
+     * @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in
+     *             this method
+     */
+    protected Map<String, Object> generateResponseMetaData(final Context gremlinContext,
+                                                           final ResponseStatusCode code, final Iterator<?> itty) {
+        return Collections.emptyMap();
+    }
+
+    protected Frame makeFrame(final Context gremlinContext, final List<Object> aggregate,
+                              final ResponseStatusCode code, final Iterator<?> itty) throws Exception {
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+        final ChannelHandlerContext nettyContext = gremlinContext.getChannelHandlerContext();
+        final MessageSerializer serializer = nettyContext.channel().attr(StateKey.SERIALIZER).get();
+        final boolean useBinary = nettyContext.channel().attr(StateKey.USE_BINARY).get();
+
+        final Map<String, Object> responseMetaData = generateResponseMetaData(gremlinContext, code, itty);
+        final Map<String, Object> statusAttributes = generateStatusAttributes(gremlinContext, code, itty);
+        try {
+            if (useBinary) {
+                return new Frame(serializer.serializeResponseAsBinary(ResponseMessage.build(msg)
+                        .code(code)
+                        .statusAttributes(statusAttributes)
+                        .responseMetaData(responseMetaData)
+                        .result(aggregate).create(), nettyContext.alloc()));
+            } else {
+                // the expectation is that the GremlinTextRequestDecoder will have placed a MessageTextSerializer
+                // instance on the channel.
+                final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
+                return new Frame(textSerializer.serializeResponseAsString(ResponseMessage.build(msg)
+                        .code(code)
+                        .statusAttributes(statusAttributes)
+                        .responseMetaData(responseMetaData)
+                        .result(aggregate).create()));
+            }
+        } catch (Exception ex) {
+            logger.warn("The result [{}] in the request {} could not be serialized and returned.", aggregate, msg.getRequestId(), ex);
+            final String errorMessage = String.format("Error during serialization: %s", ExceptionHelper.getMessageFromExceptionOrCause(ex));
+            final ResponseMessage error = ResponseMessage.build(msg.getRequestId())
+                    .statusMessage(errorMessage)
+                    .statusAttributeException(ex)
+                    .code(ResponseStatusCode.SERVER_ERROR_SERIALIZATION).create();
+            gremlinContext.writeAndFlush(error);
+            throw ex;
+        }
+    }
+
+    /**
+     * Called right before a transaction starts within {@link #run()}.
+     */
+    protected void startTransaction(final Context gremlinContext) {
+        // check if transactions are open and rollback first to ensure a fresh start.
+        graphManager.rollbackAll();
+    }
+
+    /**
+     * Close the transaction without a {@link Context} which supplies {@code null} to that argument for
+     * {@link #closeTransaction(Context, Transaction.Status)}. This method is idempotent.
+     */
+    protected void closeTransaction(final Transaction.Status status) {
+        closeTransaction(null, status);
+    }
+
+    /**
+     * Tries to close the transaction but will catch exceptions and log them. This method is idempotent.
+     */
+    protected void closeTransactionSafely(final Transaction.Status status) {
+        closeTransactionSafely(null, status);
+    }
+
+    /**
+     * Tries to close the transaction but will catch exceptions and log them. This method is idempotent.
+     */
+    protected void closeTransactionSafely(final Context gremlinContext, final Transaction.Status status) {
+        try {
+            closeTransaction(gremlinContext, status);
+        } catch (Exception ex) {
+            logger.error("Failed to close transaction", ex);
+        }
+    }
+
+    /**
+     * Closes a transaction with commit or rollback. Strict transaction management settings are observed when
+     * configured as such in {@link Settings#strictTransactionManagement} and when aliases are present on the
+     * request in the current {@link Context}. If the supplied {@link Context} is {@code null} then "strict" is
+     * bypassed so this form must be called with care. Bypassing is often useful to ensure that all transactions
+     * are cleaned up when multiple graphs are referenced. Prefer calling {@link #closeTransaction(Transaction.Status)}
+     * in this case instead. This method is idempotent.
+     */
+    protected void closeTransaction(final Context gremlinContext, final Transaction.Status status) {
+        if (status != Transaction.Status.COMMIT && status != Transaction.Status.ROLLBACK)
+            throw new IllegalStateException(String.format("Transaction.Status not supported: %s", status));
+
+        final boolean commit = status == Transaction.Status.COMMIT;
+        final boolean strict = gremlinContext != null && gremlinContext.getSettings().strictTransactionManagement;
+
+        if (strict) {
+            if (commit)
+                graphManager.commit(new HashSet<>(aliasesUsedByRexster));
+            else
+                graphManager.rollback(new HashSet<>(aliasesUsedByRexster));
+        } else {
+            if (commit)
+                graphManager.commitAll();
+            else
+                graphManager.rollbackAll();
+        }
+    }
+
+    private void processAuditLog(final Settings settings, final ChannelHandlerContext ctx, final Object gremlinToExecute) {
+        if (settings.enableAuditLog) {
+            AuthenticatedUser user = ctx.channel().attr(StateKey.AUTHENTICATED_USER).get();
+            if (null == user) {    // This is expected when using the AllowAllAuthenticator
+                user = AuthenticatedUser.ANONYMOUS_USER;
+            }
+            String address = ctx.channel().remoteAddress().toString();
+            if (address.startsWith("/") && address.length() > 1) address = address.substring(1);
+            auditLogger.info("User {} with address {} requested: {}", user.getName(), address, gremlinToExecute);
+        }
+
+        if (settings.authentication.enableAuditLog) {
+            String address = ctx.channel().remoteAddress().toString();
+            if (address.startsWith("/") && address.length() > 1) address = address.substring(1);
+            auditLogger.info("User with address {} requested: {}", address, gremlinToExecute);
+        }
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthenticationHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthenticationHandler.java
index a282874..a050ab0 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthenticationHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthenticationHandler.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.server.Settings;
 import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
 import org.apache.tinkerpop.gremlin.server.auth.AuthenticationException;
 import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
+import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,8 +55,16 @@ public class HttpBasicAuthenticationHandler extends AbstractAuthenticationHandle
 
     private final Base64.Decoder decoder = Base64.getUrlDecoder();
 
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #HttpBasicAuthenticationHandler(Authenticator, Authorizer, Settings)}.
+     */
+    @Deprecated
     public HttpBasicAuthenticationHandler(final Authenticator authenticator, final Settings settings) {
-        super(authenticator);
+        this(authenticator, null, settings);
+    }
+
+    public HttpBasicAuthenticationHandler(final Authenticator authenticator, final Authorizer authorizer, final Settings settings) {
+        super(authenticator, authorizer);
         this.settings = settings;
     }
 
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java
new file mode 100644
index 0000000..539c16b
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
+import org.apache.tinkerpop.gremlin.groovy.jsr223.GroovyCompilerGremlinPlugin;
+import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptEngine;
+import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptEngineManager;
+import org.apache.tinkerpop.gremlin.server.Context;
+import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.script.Bindings;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor.CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED;
+
+/**
+ * A {@link Rexster} implementation that queues tasks given to it and executes them in a serial fashion within the
+ * same thread which thus allows multiple tasks to be executed in the same transaction.
+ */
+public class MultiRexster extends AbstractRexster {
+    private static final Logger logger = LoggerFactory.getLogger(MultiRexster.class);
+    protected final BlockingQueue<Context> queue = new LinkedBlockingQueue<>();
+    private ScheduledFuture<?> requestCancelFuture;
+    private Bindings bindings;
+    private final AtomicBoolean ending = new AtomicBoolean(false);
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final GremlinScriptEngineManager scriptEngineManager;
+
+    MultiRexster(final Context gremlinContext, final String sessionId,
+                 final ConcurrentMap<String, Rexster> sessions) {
+        super(gremlinContext, sessionId, false, sessions);
+
+        // using a global function cache is cheaper than creating a new on per session especially if you have to
+        // create a lot of sessions. it will generate a ton of throw-away objects. mostly keeping the option open
+        // to not use it to preserve the ability to use the old functionality if wanted or if there is some specific
+        // use case with sessions that needs it. if we wanted this could eventually become a per-request option
+        // so that the client could control it as necessary and get scriptengine isolation if they need it.
+        if (gremlinContext.getSettings().useCommonEngineForSessions)
+            scriptEngineManager = gremlinContext.getGremlinExecutor().getScriptEngineManager();
+        else
+            scriptEngineManager = initializeGremlinExecutor(gremlinContext).getScriptEngineManager();
+
+        scheduledExecutorService = gremlinContext.getScheduledExecutorService();
+        addTask(gremlinContext);
+    }
+
+    @Override
+    public GremlinScriptEngine getScriptEngine(final Context gremlinContext, final String language) {
+        return scriptEngineManager.getEngineByName(language);
+    }
+
+    @Override
+    public boolean acceptingTasks() {
+        return !ending.get();
+    }
+
+    @Override
+    public void addTask(final Context gremlinContext) {
+        // todo: explicitly reject request???
+        if (acceptingTasks())
+            queue.offer(gremlinContext);
+    }
+
+    @Override
+    public void run() {
+        // there must be one item in the queue at least since addTask() gets called before the worker
+        // is ever started
+        Context gremlinContext = queue.poll();
+        if (null == gremlinContext)
+            throw new IllegalStateException(String.format("Worker has no initial context for session: %s", getSessionId()));
+
+        try {
+            startTransaction(gremlinContext);
+            try {
+                while (true) {
+                    // schedule timeout for the current request from the queue
+                    final long seto = gremlinContext.getRequestTimeout();
+                    requestCancelFuture = scheduledExecutorService.schedule(
+                            () -> this.triggerTimeout(seto, false),
+                            seto, TimeUnit.MILLISECONDS);
+
+                    process(gremlinContext);
+
+                    // work is done within the timeout period so cancel it
+                    cancelRequestTimeout();
+
+                    gremlinContext = queue.take();
+                }
+            } catch (Exception ex) {
+                // stop accepting requests on this worker since it is heading to close()
+                ending.set(true);
+
+                // the current context gets its exception handled...
+                handleException(gremlinContext, ex);
+            }
+        } catch (RexsterException rexex) {
+            // remaining work items in the queue are ignored since this worker is closing. must send
+            // back some sort of response to satisfy the client. writeAndFlush code is different than
+            // the ResponseMessage as we don't want the message to be "final" for the Context. that
+            // status must be reserved for the message that caused the error
+            for (Context gctx : queue) {
+                gctx.writeAndFlush(ResponseStatusCode.PARTIAL_CONTENT, ResponseMessage.build(gctx.getRequestMessage())
+                        .code(ResponseStatusCode.SERVER_ERROR)
+                        .statusMessage(String.format(
+                                "An earlier request [%s] failed prior to this one having a chance to execute",
+                                gremlinContext.getRequestMessage().getRequestId())).create());
+            }
+
+            // exception should trigger a rollback in the session. a more focused rollback may have occurred
+            // during process() and the related result iteration IF transaction management was enabled on
+            // the request
+            closeTransactionSafely(Transaction.Status.ROLLBACK);
+
+            logger.warn(rexex.getMessage(), rexex);
+            gremlinContext.writeAndFlush(rexex.getResponseMessage());
+        } finally {
+            close();
+        }
+    }
+
+    @Override
+    public synchronized void close() {
+        ending.set(true);
+        cancelRequestTimeout();
+        super.close();
+        logger.info("Session {} closed", getSessionId());
+    }
+
+    private void cancelRequestTimeout() {
+        if (requestCancelFuture != null && !requestCancelFuture.isDone())
+            requestCancelFuture.cancel(true);
+    }
+
+    @Override
+    protected Bindings getWorkerBindings() throws RexsterException {
+        if (null == bindings)
+            bindings = super.getWorkerBindings();
+        return this.bindings;
+    }
+
+    protected GremlinExecutor initializeGremlinExecutor(final Context gremlinContext) {
+        final Settings settings = gremlinContext.getSettings();
+        final ExecutorService executor = gremlinContext.getGremlinExecutor().getExecutorService();
+        final boolean useGlobalFunctionCache = settings.useGlobalFunctionCacheForSessions;
+
+        // these initial settings don't matter so much as we don't really execute things through the
+        // GremlinExecutor directly. Just doing all this setup to make GremlinExecutor do the work of
+        // rigging up the GremlinScriptEngineManager.
+        final GremlinExecutor.Builder gremlinExecutorBuilder = GremlinExecutor.build()
+                .evaluationTimeout(settings.getEvaluationTimeout())
+                .executorService(executor)
+                .globalBindings(graphManager.getAsBindings())
+                .scheduledExecutorService(scheduledExecutorService);
+
+        settings.scriptEngines.forEach((k, v) -> {
+            // use plugins if they are present
+            if (!v.plugins.isEmpty()) {
+                // make sure that server related classes are available at init. the LifeCycleHook stuff will be
+                // added explicitly via configuration using GremlinServerGremlinModule in the yaml. need to override
+                // scriptengine settings with SessionOpProcessor specific ones as the processing for sessions is
+                // different and a global setting may not make sense for a session
+                if (v.plugins.containsKey(GroovyCompilerGremlinPlugin.class.getName())) {
+                    v.plugins.get(GroovyCompilerGremlinPlugin.class.getName()).put(CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED, useGlobalFunctionCache);
+                } else {
+                    final Map<String,Object> pluginConf = new HashMap<>();
+                    pluginConf.put(CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED, useGlobalFunctionCache);
+                    v.plugins.put(GroovyCompilerGremlinPlugin.class.getName(), pluginConf);
+                }
+
+                gremlinExecutorBuilder.addPlugins(k, v.plugins);
+            }
+        });
+
+        return gremlinExecutorBuilder.create();
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java
new file mode 100644
index 0000000..70869dd
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import io.netty.channel.Channel;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.server.Context;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * The {@code Rexster} interface is essentially a form of "worker", named for Gremlin's trusty canine robot friend
+ * who represents the "server" aspect of TinkerPop. In the most generic sense, {@code Rexster} implementations take
+ * tasks, a Gremlin Server {@link Context} and process them, which typically means "execute some Gremlin".
+ * Implementations have a fair amount of flexibility in terms of how they go about doing this, but in most cases,
+ * the {@link SingleRexster} and the {@link MultiRexster} should handle most cases quite well and are extensible for
+ * providers.
+ */
+public interface Rexster extends Runnable {
+    /**
+     * Adds a task for Rexster to complete.
+     */
+    void addTask(final Context gremlinContext);
+
+    /**
+     * Sets a reference to the job that will cancel this Rexster if it exceeds its timeout period.
+     */
+    void setSessionCancelFuture(final ScheduledFuture<?> f);
+
+    /**
+     * Sets a reference to the job itself that is running this Rexster.
+     */
+    void setSessionFuture(final Future<?> f);
+
+    /**
+     * Provides a general way to tell Rexster that it has exceeded some timeout condition.
+     */
+    void triggerTimeout(final long timeout, boolean causedBySession);
+
+    /**
+     * Determines if the supplied {@code Channel} object is the same as the one bound to the {@code Session}.
+     */
+    boolean isBoundTo(final Channel channel);
+
+    /**
+     * Determins if this Rexster can accept additional tasks or not.
+     */
+    boolean acceptingTasks();
+
+    public class RexsterException extends Exception {
+        private final ResponseMessage responseMessage;
+
+        public RexsterException(final String message, final ResponseMessage responseMessage) {
+            super(message);
+            this.responseMessage = responseMessage;
+        }
+
+        public RexsterException(final String message, final Throwable cause, final ResponseMessage responseMessage) {
+            super(message, cause);
+            this.responseMessage = responseMessage;
+        }
+
+        public ResponseMessage getResponseMessage() {
+            return this.responseMessage;
+        }
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAndHttpBasicAuthenticationHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAndHttpBasicAuthenticationHandler.java
index d58553d..31dabd0 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAndHttpBasicAuthenticationHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAndHttpBasicAuthenticationHandler.java
@@ -21,14 +21,14 @@ package org.apache.tinkerpop.gremlin.server.handler;
 
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.http.HttpMessage;
 import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
 import org.apache.tinkerpop.gremlin.server.Settings;
-import org.apache.tinkerpop.gremlin.server.handler.HttpBasicAuthenticationHandler;
-import org.apache.tinkerpop.gremlin.server.handler.SaslAuthenticationHandler;
-import org.apache.tinkerpop.gremlin.server.handler.WebSocketHandlerUtil;
+import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
 
+import static org.apache.tinkerpop.gremlin.server.AbstractChannelizer.PIPELINE_AUTHORIZER;
 import static org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer.PIPELINE_AUTHENTICATOR;
 
 /**
@@ -39,18 +39,33 @@ public class SaslAndHttpBasicAuthenticationHandler extends SaslAuthenticationHan
 
     private final String HTTP_AUTH = "http-authentication";
 
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #SaslAndHttpBasicAuthenticationHandler(Authenticator, Authorizer, Settings)}.
+     */
+    @Deprecated
     public SaslAndHttpBasicAuthenticationHandler(final Authenticator authenticator, final Settings settings) {
-        super(authenticator, settings);
+        this(authenticator, null, settings);
+    }
+
+    public SaslAndHttpBasicAuthenticationHandler(final Authenticator authenticator, final Authorizer authorizer, final Settings settings) {
+        super(authenticator, authorizer, settings);
     }
 
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object obj) throws Exception {
         if (obj instanceof HttpMessage && !WebSocketHandlerUtil.isWebSocket((HttpMessage)obj)) {
-            ChannelPipeline pipeline = ctx.pipeline();
+            final ChannelPipeline pipeline = ctx.pipeline();
             if (null != pipeline.get(HTTP_AUTH)) {
                 pipeline.remove(HTTP_AUTH);
             }
             pipeline.addAfter(PIPELINE_AUTHENTICATOR, HTTP_AUTH, new HttpBasicAuthenticationHandler(authenticator, this.settings));
+
+            if (authorizer != null) {
+                final ChannelInboundHandlerAdapter authorizationHandler = new HttpBasicAuthorizationHandler(authorizer);
+                pipeline.remove(PIPELINE_AUTHORIZER);
+                pipeline.addAfter(HTTP_AUTH, PIPELINE_AUTHORIZER, authorizationHandler);
+            }
+
             ctx.fireChannelRead(obj);
         } else {
             super.channelRead(ctx, obj);
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
index e8216a6..55da853 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
@@ -40,6 +40,7 @@ import org.apache.tinkerpop.gremlin.server.Settings;
 import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
 import org.apache.tinkerpop.gremlin.server.auth.AuthenticationException;
 import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
+import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
 import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,8 +60,16 @@ public class SaslAuthenticationHandler extends AbstractAuthenticationHandler {
 
     protected final Settings settings;
 
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #SaslAuthenticationHandler(Authenticator, Authorizer, Settings)}.
+     */
+    @Deprecated
     public SaslAuthenticationHandler(final Authenticator authenticator, final Settings settings) {
-        super(authenticator);
+        this(authenticator, null, settings);
+    }
+
+    public SaslAuthenticationHandler(final Authenticator authenticator, final Authorizer authorizer, final Settings settings) {
+        super(authenticator, authorizer);
         this.settings = settings;
     }
 
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SingleRexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SingleRexster.java
new file mode 100644
index 0000000..05d5dff
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SingleRexster.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import org.apache.tinkerpop.gremlin.server.Context;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+
+/**
+ * A simple {@link Rexster} implementation that accepts one request, processes it and exits.
+ */
+public class SingleRexster extends AbstractRexster {
+    private static final Logger logger = LoggerFactory.getLogger(SingleRexster.class);
+    protected final Context gremlinContext;
+
+    SingleRexster(final Context gremlinContext, final String sessionId,
+                  final ConcurrentMap<String, Rexster> sessions) {
+        super(gremlinContext, sessionId,true, sessions);
+        this.gremlinContext = gremlinContext;
+    }
+
+    /**
+     * The {@code SingleWorker} can only process one request so the initial construction of it already has the
+     * request in it and no more can be added, therefore this method always return {@code false}.
+     */
+    @Override
+    public boolean acceptingTasks() {
+        return false;
+    }
+
+    @Override
+    public void addTask(final Context gremlinContext) {
+        throw new UnsupportedOperationException("SingleWorker doesn't accept tasks beyond the one provided to the constructor");
+    }
+
+    @Override
+    public void run() {
+        try {
+            startTransaction(gremlinContext);
+            process(gremlinContext);
+        } catch (RexsterException we) {
+            logger.warn(we.getMessage(), we);
+
+            // should have already rolledback - this is a safety valve
+            closeTransactionSafely(gremlinContext, Transaction.Status.ROLLBACK);
+
+            gremlinContext.writeAndFlush(we.getResponseMessage());
+        } finally {
+            close();
+        }
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java
new file mode 100644
index 0000000..7483062
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
+import org.apache.tinkerpop.gremlin.process.traversal.Operator;
+import org.apache.tinkerpop.gremlin.process.traversal.Order;
+import org.apache.tinkerpop.gremlin.process.traversal.Pop;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.server.Channelizer;
+import org.apache.tinkerpop.gremlin.server.Context;
+import org.apache.tinkerpop.gremlin.server.GraphManager;
+import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.Rexster.RexsterException;
+import org.apache.tinkerpop.gremlin.structure.Column;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Handler for websockets to be used with the {@link UnifiedChannelizer}.
+ */
+@ChannelHandler.Sharable
+public class UnifiedHandler extends SimpleChannelInboundHandler<RequestMessage> {
+    private static final Logger logger = LoggerFactory.getLogger(UnifiedHandler.class);
+
+    protected final Settings settings;
+    protected final GraphManager graphManager;
+    protected final GremlinExecutor gremlinExecutor;
+    protected final ScheduledExecutorService scheduledExecutorService;
+    protected final ExecutorService executorService;
+    protected final Channelizer channelizer;
+
+    protected final ConcurrentMap<String, Rexster> sessions = new ConcurrentHashMap<>();
+
+    /**
+     * This may or may not be the full set of invalid binding keys.  It is dependent on the static imports made to
+     * Gremlin Server.  This should get rid of the worst offenders though and provide a good message back to the
+     * calling client.
+     * <p/>
+     * Use of {@code toUpperCase()} on the accessor values of {@link T} solves an issue where the {@code ScriptEngine}
+     * ignores private scope on {@link T} and imports static fields.
+     */
+    protected static final Set<String> INVALID_BINDINGS_KEYS = new HashSet<>();
+
+    static {
+        INVALID_BINDINGS_KEYS.addAll(Arrays.asList(
+                T.id.name(), T.key.name(),
+                T.label.name(), T.value.name(),
+                T.id.getAccessor(), T.key.getAccessor(),
+                T.label.getAccessor(), T.value.getAccessor(),
+                T.id.getAccessor().toUpperCase(), T.key.getAccessor().toUpperCase(),
+                T.label.getAccessor().toUpperCase(), T.value.getAccessor().toUpperCase()));
+
+        for (Column enumItem : Column.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+
+        for (Order enumItem : Order.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+
+        for (Operator enumItem : Operator.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+
+        for (Scope enumItem : Scope.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+
+        for (Pop enumItem : Pop.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+    }
+
+    public UnifiedHandler(final Settings settings, final GraphManager graphManager,
+                          final GremlinExecutor gremlinExecutor,
+                          final ScheduledExecutorService scheduledExecutorService,
+                          final Channelizer channelizer) {
+        this.settings = settings;
+        this.graphManager = graphManager;
+        this.gremlinExecutor = gremlinExecutor;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.executorService = gremlinExecutor.getExecutorService();
+        this.channelizer = channelizer;
+    }
+
+    @Override
+    protected void channelRead0(final ChannelHandlerContext ctx, final RequestMessage msg) throws Exception {
+        try {
+            try {
+                validateRequest(msg, graphManager);
+            } catch (RexsterException we) {
+                ctx.writeAndFlush(we.getResponseMessage());
+                return;
+            }
+
+            final Optional<String> optSession = msg.optionalArgs(Tokens.ARGS_SESSION);
+            final String sessionId = optSession.orElse(UUID.randomUUID().toString());
+
+            // still using GremlinExecutor here in the Context so that this object doesn't need to immediately
+            // change, but also because GremlinExecutor/ScriptEngine config is all rigged up into the server nicely
+            // right now. when the UnifiedChannelizer is "ready" we can factor out the GremlinExecutor
+            final Context gremlinContext = new Context(msg, ctx, settings, graphManager,
+                    gremlinExecutor, scheduledExecutorService);
+
+            if (sessions.containsKey(sessionId)) {
+                final Rexster rexster = sessions.get(sessionId);
+
+                // check if the session is still accepting requests - if not block further requests
+                if (!rexster.acceptingTasks()) {
+                    final String sessionClosedMessage = String.format(
+                            "Session %s is no longer accepting requests as it has been closed", sessionId);
+                    final ResponseMessage response = ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
+                            .statusMessage(sessionClosedMessage).create();
+                    ctx.writeAndFlush(response);
+                    return;
+                }
+
+                // check if the session is bound to this channel, thus one client per session
+                if (!rexster.isBoundTo(gremlinContext.getChannelHandlerContext().channel())) {
+                    final String sessionClosedMessage = String.format("Session %s is not bound to the connecting client", sessionId);
+                    final ResponseMessage response = ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
+                            .statusMessage(sessionClosedMessage).create();
+                    ctx.writeAndFlush(response);
+                    return;
+                }
+
+                rexster.addTask(gremlinContext);
+            } else {
+                final Rexster rexster = optSession.isPresent() ?
+                        createMulti(gremlinContext, sessionId) : createSingle(gremlinContext, sessionId);
+                final Future<?> sessionFuture = executorService.submit(rexster);
+                rexster.setSessionFuture(sessionFuture);
+                sessions.put(sessionId, rexster);
+
+                // determine the max session life. for multi that's going to be "session life" and for single that
+                // will be the span of the request timeout
+                final long seto = gremlinContext.getRequestTimeout();
+                final long sessionLife = optSession.isPresent() ? settings.sessionLifetimeTimeout : seto;
+
+                // if timeout is enabled when greater than zero
+                if (seto > 0) {
+                    final ScheduledFuture<?> sessionCancelFuture =
+                            scheduledExecutorService.schedule(
+                                    () -> rexster.triggerTimeout(sessionLife, optSession.isPresent()),
+                                    sessionLife, TimeUnit.MILLISECONDS);
+                    rexster.setSessionCancelFuture(sessionCancelFuture);
+                }
+            }
+        } finally {
+            ReferenceCountUtil.release(msg);
+        }
+    }
+
+    protected void validateRequest(final RequestMessage message, final GraphManager graphManager) throws RexsterException {
+        if (!message.optionalArgs(Tokens.ARGS_GREMLIN).isPresent()) {
+            final String msg = String.format("A message with an [%s] op code requires a [%s] argument.", message.getOp(), Tokens.ARGS_GREMLIN);
+            throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+        }
+
+        if (message.optionalArgs(Tokens.ARGS_BINDINGS).isPresent()) {
+            final Map bindings = (Map) message.getArgs().get(Tokens.ARGS_BINDINGS);
+            if (IteratorUtils.anyMatch(bindings.keySet().iterator(), k -> null == k || !(k instanceof String))) {
+                final String msg = String.format("The [%s] message is using one or more invalid binding keys - they must be of type String and cannot be null", Tokens.OPS_EVAL);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+
+            final Set<String> badBindings = IteratorUtils.set(IteratorUtils.<String>filter(bindings.keySet().iterator(), INVALID_BINDINGS_KEYS::contains));
+            if (!badBindings.isEmpty()) {
+                final String msg = String.format("The [%s] message supplies one or more invalid parameters key of [%s] - these are reserved names.", Tokens.OPS_EVAL, badBindings);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+
+            // ignore control bindings that get passed in with the "#jsr223" prefix - those aren't used in compilation
+            if (IteratorUtils.count(IteratorUtils.filter(bindings.keySet().iterator(), k -> !k.toString().startsWith("#jsr223"))) > settings.maxParameters) {
+                final String msg = String.format("The [%s] message contains %s bindings which is more than is allowed by the server %s configuration",
+                        Tokens.OPS_EVAL, bindings.size(), settings.maxParameters);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+        }
+
+        // bytecode must have an alias defined
+        if (message.getOp().equals(Tokens.OPS_BYTECODE)) {
+            final Optional<Map<String, String>> aliases = message.optionalArgs(Tokens.ARGS_ALIASES);
+            if (!aliases.isPresent()) {
+                final String msg = String.format("A message with [%s] op code requires a [%s] argument.", Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+
+            if (aliases.get().size() != 1 || !aliases.get().containsKey(Tokens.VAL_TRAVERSAL_SOURCE_ALIAS)) {
+                final String msg = String.format("A message with [%s] op code requires the [%s] argument to be a Map containing one alias assignment named '%s'.",
+                        Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES, Tokens.VAL_TRAVERSAL_SOURCE_ALIAS);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+
+            final String traversalSourceBindingForAlias = aliases.get().values().iterator().next();
+            if (!graphManager.getTraversalSourceNames().contains(traversalSourceBindingForAlias)) {
+                final String msg = String.format("The traversal source [%s] for alias [%s] is not configured on the server.", traversalSourceBindingForAlias, Tokens.VAL_TRAVERSAL_SOURCE_ALIAS);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+        }
+    }
+
+    @Override
+    public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
+        // only need to handle this event if the idle monitor is on
+        if (!channelizer.supportsIdleMonitor()) return;
+
+        if (evt instanceof IdleStateEvent) {
+            final IdleStateEvent e = (IdleStateEvent) evt;
+
+            // if no requests (reader) then close, if no writes from server to client then ping. clients should
+            // periodically ping the server, but coming from this direction allows the server to kill channels that
+            // have dead clients on the other end
+            if (e.state() == IdleState.READER_IDLE) {
+                logger.info("Closing channel - client is disconnected after idle period of " + settings.idleConnectionTimeout + " " + ctx.channel().id().asShortText());
+                ctx.close();
+            } else if (e.state() == IdleState.WRITER_IDLE && settings.keepAliveInterval > 0) {
+                logger.info("Checking channel - sending ping to client after idle period of " + settings.keepAliveInterval + " " + ctx.channel().id().asShortText());
+                ctx.writeAndFlush(channelizer.createIdleDetectionMessage());
+            }
+        }
+    }
+
+    protected Rexster createSingle(final Context gremlinContext, final String sessionId) {
+        return new SingleRexster(gremlinContext, sessionId, sessions);
+    }
+
+    protected Rexster createMulti(final Context gremlinContext, final String sessionId) {
+        return new MultiRexster(gremlinContext, sessionId, sessions);
+    }
+
+    public boolean isActiveSession(final String sessionId) {
+        return sessions.containsKey(sessionId);
+    }
+
+    public int getActiveSessionCount() {
+        return sessions.size();
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java
index e4f79b4..a93e1c4 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java
@@ -28,9 +28,9 @@ import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
 import org.apache.tinkerpop.gremlin.server.channel.WsAndHttpChannelizer;
 import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
 
+import static org.apache.tinkerpop.gremlin.server.AbstractChannelizer.PIPELINE_HTTP_AGGREGATOR;
 import static org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer.PIPELINE_AUTHENTICATOR;
 import static org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer.PIPELINE_REQUEST_HANDLER;
-import static org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer.PIPELINE_HTTP_RESPONSE_ENCODER;
 
 /**
  * A ChannelInboundHandlerAdapter for use with {@link WsAndHttpChannelizer} that toggles between WebSockets
@@ -66,11 +66,11 @@ public class WsAndHttpChannelizerHandler extends ChannelInboundHandlerAdapter {
                 pipeline.remove(PIPELINE_REQUEST_HANDLER);
                 final ChannelHandler authenticator = pipeline.get(PIPELINE_AUTHENTICATOR);
                 pipeline.remove(PIPELINE_AUTHENTICATOR);
-                pipeline.addAfter(PIPELINE_HTTP_RESPONSE_ENCODER, PIPELINE_AUTHENTICATOR, authenticator);
+                pipeline.addAfter(PIPELINE_HTTP_AGGREGATOR, PIPELINE_AUTHENTICATOR, authenticator);
                 pipeline.addAfter(PIPELINE_AUTHENTICATOR, PIPELINE_REQUEST_HANDLER, this.httpGremlinEndpointHandler);
             } else {
                 pipeline.remove(PIPELINE_REQUEST_HANDLER);
-                pipeline.addAfter(PIPELINE_HTTP_RESPONSE_ENCODER, PIPELINE_REQUEST_HANDLER, this.httpGremlinEndpointHandler);
+                pipeline.addAfter(PIPELINE_HTTP_AGGREGATOR, PIPELINE_REQUEST_HANDLER, this.httpGremlinEndpointHandler);
             }
         }
         ctx.fireChannelRead(obj);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
index 55f89e6..deb1734 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
@@ -18,7 +18,14 @@
  */
 package org.apache.tinkerpop.gremlin.server;
 
+import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizerIntegrateTest;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizerIntegrateTest;
+import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
+import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizerIntegrateTest;
 import org.apache.tinkerpop.gremlin.server.op.OpLoader;
+import org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor;
+import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -96,16 +103,28 @@ public abstract class AbstractGremlinServerIntegrationTest {
         startServer(settings);
     }
 
+    private boolean shouldTestUnified() {
+        // ignore all tests in the UnifiedChannelizerIntegrateTest package as they are already rigged to test
+        // over the various channelizer implementations
+        return Boolean.parseBoolean(System.getProperty("testUnified", "false")) &&
+                !this.getClass().getPackage().equals(UnifiedChannelizerIntegrateTest.class.getPackage());
+    }
+
     public void startServer(final Settings settings) throws Exception {
         if (null == settings) {
             startServer();
         } else {
-            final Settings overridenSettings = overrideSettings(settings);
-            ServerTestHelper.rewritePathsInGremlinServerSettings(overridenSettings);
+            final Settings oSettings = overrideSettings(settings);
+
+            if (shouldTestUnified()) {
+                oSettings.channelizer = UnifiedChannelizer.class.getName();
+            }
+
+            ServerTestHelper.rewritePathsInGremlinServerSettings(oSettings);
             if (GREMLIN_SERVER_EPOLL) {
-                overridenSettings.useEpollEventLoop = true;
+                oSettings.useEpollEventLoop = true;
             }
-            this.server = new GremlinServer(overridenSettings);
+            this.server = new GremlinServer(oSettings);
             server.start().join();
         }
     }
@@ -119,6 +138,10 @@ public abstract class AbstractGremlinServerIntegrationTest {
             overriddenSettings.useEpollEventLoop = true;
         }
 
+        if (shouldTestUnified()) {
+            overriddenSettings.channelizer = UnifiedChannelizer.class.getName();
+        }
+
         this.server = new GremlinServer(overriddenSettings);
 
         server.start().join();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
index f8956e2..e685932 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
@@ -47,7 +47,8 @@ public class ContextTest {
 
     private final ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class);
     private final RequestMessage request = RequestMessage.build("test").create();
-    private final Context context = new Context(request, ctx, null, null, null, null);
+    private final Settings settings = new Settings();
+    private final Context context = new Context(request, ctx, settings, null, null, null);
     private final Log4jRecordingAppender recordingAppender = new Log4jRecordingAppender();
 
     private Level originalLogLevel;
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 3c1d308..c11cad8 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -42,6 +42,7 @@ import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin;
 import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
 import org.apache.tinkerpop.gremlin.server.handler.OpExecutorHandler;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
@@ -99,6 +100,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.core.StringStartsWith.startsWith;
+import static org.junit.Assume.assumeThat;
 import static org.mockito.Mockito.verify;
 
 /**
@@ -1645,16 +1647,60 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
                 final CompletableFuture<List<Result>> futureThird = third.get().all();
                 final CompletableFuture<List<Result>> futureFourth = fourth.get().all();
 
-                assertFutureTimeout(futureFirst);
-                assertFutureTimeout(futureSecond);
-                assertFutureTimeout(futureThird);
-                assertFutureTimeout(futureFourth);
+                // there is slightly different assertion logic with UnifiedChannelizer given differences in session
+                // behavior where UnfiedChannelizer sessions won't continue processing in the face of a timeout and
+                // a new session will need to be created
+                if (server.getServerGremlinExecutor().getSettings().channelizer.equals(UnifiedChannelizer.class.getName())) {
+                    // first timesout and the rest get SERVER_ERROR
+                    try {
+                        futureFirst.get();
+                        fail("Should have timed out");
+                    } catch (Exception ex) {
+                        final Throwable root = ExceptionUtils.getRootCause(ex);
+                        assertThat(root, instanceOf(ResponseException.class));
+                        assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) root).getResponseStatusCode());
+                        assertThat(root.getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("250 ms")));
+                    }
+
+                    assertFutureTimeoutUnderUnified(futureSecond);
+                    assertFutureTimeoutUnderUnified(futureThird);
+                    assertFutureTimeoutUnderUnified(futureFourth);
+                } else {
+                    assertFutureTimeout(futureFirst);
+                    assertFutureTimeout(futureSecond);
+                    assertFutureTimeout(futureThird);
+                    assertFutureTimeout(futureFourth);
+                }
             }
         } finally {
             cluster.close();
         }
     }
 
+    private void assertFutureTimeoutUnderUnified(final CompletableFuture<List<Result>> f) {
+        try {
+            f.get();
+            fail("Should have timed out");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(ResponseException.class));
+            assertEquals(ResponseStatusCode.SERVER_ERROR, ((ResponseException) root).getResponseStatusCode());
+            assertThat(root.getMessage(), allOf(startsWith("An earlier request"), endsWith("failed prior to this one having a chance to execute")));
+        }
+    }
+
+    private void assertFutureTimeout(final CompletableFuture<List<Result>> f) {
+        try {
+            f.get();
+            fail("Should have timed out");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(ResponseException.class));
+            assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) root).getResponseStatusCode());
+            assertThat(root.getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("250 ms")));
+        }
+    }
+
     @Test
     public void shouldCloseAllClientsOnCloseOfCluster() throws Exception {
         final Cluster cluster = TestClientFactory.open();
@@ -1780,20 +1826,6 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
 
     }
 
-    private void assertFutureTimeout(final CompletableFuture<List<Result>> futureFirst) {
-        try
-        {
-            futureFirst.get();
-            fail("Should have timed out");
-        }
-        catch (Exception ex)
-        {
-            final Throwable root = ExceptionUtils.getRootCause(ex);
-            assertThat(root, instanceOf(ResponseException.class));
-            assertThat(root.getMessage(), startsWith("Evaluation exceeded the configured 'evaluationTimeout' threshold of 250 ms"));
-        }
-    }
-
     @Test
     public void shouldClusterReadFileFromResources() throws Exception {
         final Cluster cluster = Cluster.open(TestClientFactory.RESOURCE_PATH);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogDeprecatedIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogDeprecatedIntegrateTest.java
index a9d7228..e81f889 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogDeprecatedIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogDeprecatedIntegrateTest.java
@@ -31,10 +31,10 @@ import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
 import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.server.auth.AllowAllAuthenticator;
-import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
 import org.apache.tinkerpop.gremlin.server.auth.Krb5Authenticator;
 import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
 import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
 import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
 import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
@@ -81,7 +81,7 @@ public class GremlinServerAuditLogDeprecatedIntegrateTest extends AbstractGremli
         rootLogger.addAppender(recordingAppender);
 
         try {
-            final String moduleBaseDir = System.getProperty("basedir");
+            final String moduleBaseDir = System.getProperty("basedir", ".");
             final String authConfigName = moduleBaseDir + "/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-console-jaas.conf";
             System.setProperty("java.security.auth.login.config", authConfigName);
             kdcServer = new KdcFixture(moduleBaseDir);
@@ -139,6 +139,7 @@ public class GremlinServerAuditLogDeprecatedIntegrateTest extends AbstractGremli
                 settings.host = "localhost";
                 settings.channelizer = HttpChannelizer.class.getName();
                 authSettings.authenticator = SimpleAuthenticator.class.getName();
+                authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
                 authConfig.put(SimpleAuthenticator.CONFIG_CREDENTIALS_DB, "conf/tinkergraph-credentials.properties");
                 break;
         }
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java
index 69bb974..28ad1e1 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java
@@ -35,6 +35,7 @@ import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
 import org.apache.tinkerpop.gremlin.server.auth.Krb5Authenticator;
 import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
 import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
 import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
 import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
@@ -81,7 +82,7 @@ public class GremlinServerAuditLogIntegrateTest extends AbstractGremlinServerInt
         rootLogger.addAppender(recordingAppender);
 
         try {
-            final String moduleBaseDir = System.getProperty("basedir");
+            final String moduleBaseDir = System.getProperty("basedir", ".");
             final String authConfigName = moduleBaseDir + "/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-console-jaas.conf";
             System.setProperty("java.security.auth.login.config", authConfigName);
             kdcServer = new KdcFixture(moduleBaseDir);
@@ -139,6 +140,7 @@ public class GremlinServerAuditLogIntegrateTest extends AbstractGremlinServerInt
                 settings.host = "localhost";
                 settings.channelizer = HttpChannelizer.class.getName();
                 authSettings.authenticator = SimpleAuthenticator.class.getName();
+                authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
                 authConfig.put(SimpleAuthenticator.CONFIG_CREDENTIALS_DB, "conf/tinkergraph-credentials.properties");
                 break;
         }
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java
index 4ec467e..7d10a3a 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java
@@ -65,7 +65,7 @@ public class GremlinServerAuthKrb5IntegrateTest extends AbstractGremlinServerInt
         handlerLogger.setLevel(Level.OFF);
 
         try {
-            final String projectBaseDir = System.getProperty("basedir");
+            final String projectBaseDir = System.getProperty("basedir", ".");
             final String authConfigName = projectBaseDir + "/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-console-jaas.conf";
             System.setProperty("java.security.auth.login.config", authConfigName);
             kdcServer = new KdcFixture(projectBaseDir);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthzIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthzIntegrateTest.java
index 5614123..00eacda 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthzIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthzIntegrateTest.java
@@ -35,6 +35,7 @@ import org.apache.tinkerpop.gremlin.server.auth.AllowAllAuthenticator;
 import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
 import org.apache.tinkerpop.gremlin.server.authz.AllowListAuthorizer;
 import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
 import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
 import org.apache.tinkerpop.gremlin.util.function.Lambda;
@@ -116,10 +117,12 @@ public class GremlinServerAuthzIntegrateTest extends AbstractGremlinServerIntegr
             case "shouldFailAuthorizeWithHttpTransport":
             case "shouldKeepAuthorizingWithHttpTransport":
                 settings.channelizer = HttpChannelizer.class.getName();
+                authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
                 break;
             case "shouldAuthorizeWithAllowAllAuthenticatorAndHttpTransport":
                 settings.channelizer = HttpChannelizer.class.getName();
                 authSettings.authenticator = AllowAllAuthenticator.class.getName();
+                authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
                 authSettings.config = null;
                 final String fileHttp = Objects.requireNonNull(getClass().getClassLoader().getResource(yamlHttpName)).getFile();
                 authzSettings.config.put(AllowListAuthorizer.KEY_AUTHORIZATION_ALLOWLIST, fileHttp);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
index fadc5e2..05cd939 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
@@ -36,6 +36,7 @@ import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
 import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
 import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
@@ -133,6 +134,7 @@ public class GremlinServerHttpIntegrateTest extends AbstractGremlinServerIntegra
     private void configureForAuthentication(final Settings settings) {
         final Settings.AuthenticationSettings authSettings = new Settings.AuthenticationSettings();
         authSettings.authenticator = SimpleAuthenticator.class.getName();
+        authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
 
         // use a credentials graph with two users in it: stephen/password and marko/rainbow-dash
         final Map<String,Object> authConfig = new HashMap<>();
@@ -147,7 +149,7 @@ public class GremlinServerHttpIntegrateTest extends AbstractGremlinServerIntegra
         authSettings.authenticator = SimpleAuthenticator.class.getName();
 
         //Add basic auth handler to make sure the reflection code path works.
-        authSettings.authenticationHandler = HttpBasicAuthenticationHandler.class.getName();
+        authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
 
         // use a credentials graph with two users in it: stephen/password and marko/rainbow-dash
         final Map<String,Object> authConfig = new HashMap<>();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
index 4f0922e..270bed2 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
@@ -40,6 +40,7 @@ import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine;
 import org.apache.tinkerpop.gremlin.groovy.jsr223.GroovyCompilerGremlinPlugin;
 import org.apache.tinkerpop.gremlin.groovy.jsr223.customizer.SimpleSandboxExtension;
 import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin;
+import org.apache.tinkerpop.gremlin.server.handler.UnifiedHandler;
 import org.apache.tinkerpop.gremlin.structure.RemoteGraph;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
@@ -58,7 +59,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -80,6 +80,7 @@ import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalS
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.AllOf.allOf;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.hamcrest.core.IsNot.not;
 import static org.hamcrest.core.StringStartsWith.startsWith;
@@ -115,9 +116,11 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
 
         if (name.getMethodName().equals("shouldPingChannelIfClientDies") ||
                 name.getMethodName().equals("shouldCloseChannelIfClientDoesntRespond")) {
-            final org.apache.log4j.Logger webSocketClientHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
-            previousLogLevel = webSocketClientHandlerLogger.getLevel();
-            webSocketClientHandlerLogger.setLevel(Level.INFO);
+            final org.apache.log4j.Logger opSelectorHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
+            final org.apache.log4j.Logger unifiedHandlerLogger = org.apache.log4j.Logger.getLogger(UnifiedHandler.class);
+            previousLogLevel = opSelectorHandlerLogger.getLevel();
+            opSelectorHandlerLogger.setLevel(Level.INFO);
+            unifiedHandlerLogger.setLevel(Level.INFO);
         }
 
         rootLogger.addAppender(recordingAppender);
@@ -129,8 +132,10 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
 
         if (name.getMethodName().equals("shouldPingChannelIfClientDies")||
                 name.getMethodName().equals("shouldCloseChannelIfClientDoesntRespond")) {
-            final org.apache.log4j.Logger webSocketClientHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
-            webSocketClientHandlerLogger.setLevel(previousLogLevel);
+            final org.apache.log4j.Logger opSelectorHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
+            opSelectorHandlerLogger.setLevel(previousLogLevel);
+            final org.apache.log4j.Logger unifiedHandlerLogger = org.apache.log4j.Logger.getLogger(UnifiedHandler.class);
+            unifiedHandlerLogger.setLevel(previousLogLevel);
         }
 
         rootLogger.removeAppender(recordingAppender);
@@ -145,6 +150,8 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
         switch (nameOfTest) {
             case "shouldProvideBetterExceptionForMethodCodeTooLarge":
                 settings.maxContentLength = 4096000;
+
+                // OpProcessor setting
                 final Settings.ProcessorSettings processorSettingsBig = new Settings.ProcessorSettings();
                 processorSettingsBig.className = StandardOpProcessor.class.getName();
                 processorSettingsBig.config = new HashMap<String,Object>() {{
@@ -152,6 +159,9 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
                 }};
                 settings.processors.clear();
                 settings.processors.add(processorSettingsBig);
+
+                // Unified setting
+                settings.maxParameters = Integer.MAX_VALUE;
                 break;
             case "shouldRespectHighWaterMarkSettingAndSucceed":
                 settings.writeBufferHighWaterMark = 64;
@@ -182,6 +192,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
                 settings.scriptEngines.get("gremlin-groovy").config = getScriptEngineConfForBaseScript();
                 break;
             case "shouldReturnInvalidRequestArgsWhenBindingCountExceedsAllowable":
+                // OpProcessor settings
                 final Settings.ProcessorSettings processorSettingsSmall = new Settings.ProcessorSettings();
                 processorSettingsSmall.className = StandardOpProcessor.class.getName();
                 processorSettingsSmall.config = new HashMap<String,Object>() {{
@@ -189,6 +200,9 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
                 }};
                 settings.processors.clear();
                 settings.processors.add(processorSettingsSmall);
+
+                // Unified settings
+                settings.maxParameters = 1;
                 break;
             case "shouldTimeOutRemoteTraversal":
                 settings.evaluationTimeout = 500;
@@ -699,7 +713,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
     public void shouldReceiveFailureTimeOutOnScriptEval() throws Exception {
         try (SimpleClient client = TestClientFactory.createWebSocketClient()){
             final List<ResponseMessage> responses = client.submit("Thread.sleep(3000);'some-stuff-that-should not return'");
-            assertThat(responses.get(0).getStatus().getMessage(), startsWith("Evaluation exceeded the configured 'evaluationTimeout' threshold of 1000 ms"));
+            assertThat(responses.get(0).getStatus().getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("1000 ms")));
 
             // validate that we can still send messages to the server
             assertEquals(2, ((List<Integer>) client.submit("1+1").get(0).getResult().getData()).get(0).intValue());
@@ -715,7 +729,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
                     .addArg(Tokens.ARGS_GREMLIN, "Thread.sleep(3000);'some-stuff-that-should not return'")
                     .create();
             final List<ResponseMessage> responses = client.submit(msg);
-            assertThat(responses.get(0).getStatus().getMessage(), startsWith("Evaluation exceeded the configured 'evaluationTimeout' threshold of 100 ms"));
+            assertThat(responses.get(0).getStatus().getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("100 ms")));
 
             // validate that we can still send messages to the server
             assertEquals(2, ((List<Integer>) client.submit("1+1").get(0).getResult().getData()).get(0).intValue());
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
index 3dc58ae..fe9ff99 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
@@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
 import org.apache.tinkerpop.gremlin.driver.simple.SimpleClient;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
 import org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor;
 import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
 import org.junit.After;
@@ -94,11 +95,16 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
             case "shouldHaveTheSessionTimeout":
             case "shouldCloseSessionOnceOnRequest":
                 settings.processors.clear();
+
+                // OpProcessor setting
                 final Settings.ProcessorSettings processorSettings = new Settings.ProcessorSettings();
                 processorSettings.className = SessionOpProcessor.class.getCanonicalName();
                 processorSettings.config = new HashMap<>();
                 processorSettings.config.put(SessionOpProcessor.CONFIG_SESSION_TIMEOUT, 3000L);
                 settings.processors.add(processorSettings);
+
+                // Unified setting
+                settings.sessionLifetimeTimeout = 3000L;
                 break;
             case "shouldCloseSessionOnClientClose":
                 clearNeo4j(settings);
@@ -106,13 +112,27 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
             case "shouldEnsureSessionBindingsAreThreadSafe":
                 settings.threadPoolWorker = 2;
                 break;
+            case "shouldUseGlobalFunctionCache":
+                // OpProcessor settings are good by default
+                // UnifiedHandler settings
+                settings.useCommonEngineForSessions = false;
+                settings.useGlobalFunctionCacheForSessions = true;
+
+                break;
             case "shouldNotUseGlobalFunctionCache":
                 settings.processors.clear();
+
+                // OpProcessor settings
                 final Settings.ProcessorSettings processorSettingsForDisableFunctionCache = new Settings.ProcessorSettings();
                 processorSettingsForDisableFunctionCache.className = SessionOpProcessor.class.getCanonicalName();
                 processorSettingsForDisableFunctionCache.config = new HashMap<>();
                 processorSettingsForDisableFunctionCache.config.put(SessionOpProcessor.CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED, false);
                 settings.processors.add(processorSettingsForDisableFunctionCache);
+
+                // UnifiedHandler settings
+                settings.useCommonEngineForSessions = false;
+                settings.useGlobalFunctionCacheForSessions = false;
+
                 break;
             case "shouldExecuteInSessionAndSessionlessWithoutOpeningTransactionWithSingleClient":
             case "shouldExecuteInSessionWithTransactionManagement":
@@ -124,6 +144,11 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
         return settings;
     }
 
+    private boolean isUsingUnifiedChannelizer() {
+        return server.getServerGremlinExecutor().
+                getSettings().channelizer.equals(UnifiedChannelizer.class.getName());
+    }
+
     private static void clearNeo4j(Settings settings) {
         deleteDirectory(new File("/tmp/neo4j"));
         settings.graphs.put("graph", "conf/neo4j-empty.properties");
@@ -140,8 +165,13 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
         client1.close();
         cluster1.close();
 
-        assertThat(recordingAppender.getMessages(), hasItem("INFO - Skipped attempt to close open graph transactions on shouldCloseSessionOnClientClose - close was forced\n"));
-        assertThat(recordingAppender.getMessages(), hasItem("INFO - Session shouldCloseSessionOnClientClose closed\n"));
+        // the following session close log message is no longer relevant as
+        if (isUsingUnifiedChannelizer()) {
+            assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
+        } else {
+            assertThat(recordingAppender.getMessages(), hasItem("INFO - Skipped attempt to close open graph transactions on shouldCloseSessionOnClientClose - close was forced\n"));
+            assertThat(recordingAppender.getMessages(), hasItem("INFO - Session shouldCloseSessionOnClientClose closed\n"));
+        }
 
         // try to reconnect to that session and make sure no state is there
         final Cluster clusterReconnect = TestClientFactory.open();
@@ -159,16 +189,27 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
         // the commit from client1 should not have gone through so there should be no data present.
         assertEquals(0, clientReconnect.submit("graph.traversal().V().count()").all().join().get(0).getInt());
         clusterReconnect.close();
+
+        if (isUsingUnifiedChannelizer()) {
+            assertEquals(0, ((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().getActiveSessionCount());
+        }
     }
 
     @Test
     public void shouldUseGlobalFunctionCache() throws Exception {
         final Cluster cluster = TestClientFactory.open();
-        final Client client = cluster.connect(name.getMethodName());
+        final Client session = cluster.connect(name.getMethodName());
+        final Client client = cluster.connect();
+
+        assertEquals(3, session.submit("def sumItUp(x,y){x+y};sumItUp(1,2)").all().get().get(0).getInt());
+        assertEquals(3, session.submit("sumItUp(1,2)").all().get().get(0).getInt());
 
         try {
-            assertEquals(3, client.submit("def addItUp(x,y){x+y};addItUp(1,2)").all().get().get(0).getInt());
-            assertEquals(3, client.submit("addItUp(1,2)").all().get().get(0).getInt());
+            client.submit("sumItUp(1,2)").all().get().get(0).getInt();
+            fail("Global functions should not be cached so the call to sumItUp() should fail");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root.getMessage(), startsWith("No signature of method"));
         } finally {
             cluster.close();
         }
@@ -180,14 +221,14 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
         final Client client = cluster.connect(name.getMethodName());
 
         try {
-            assertEquals(3, client.submit("def addItUp(x,y){x+y};addItUp(1,2)").all().get().get(0).getInt());
+            assertEquals(3, client.submit("def sumItUp(x,y){x+y};sumItUp(1,2)").all().get().get(0).getInt());
         } catch (Exception ex) {
             cluster.close();
             throw ex;
         }
 
         try {
-            client.submit("addItUp(1,2)").all().get().get(0).getInt();
+            client.submit("sumItUp(1,2)").all().get().get(0).getInt();
             fail("Global functions should not be cached so the call to addItUp() should fail");
         } catch (Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
@@ -273,8 +314,12 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
             cluster.close();
         }
 
-        assertEquals(1, recordingAppender.getMessages().stream()
-                .filter(msg -> msg.equals("INFO - Session shouldCloseSessionOnceOnRequest closed\n")).count());
+        if (isUsingUnifiedChannelizer()) {
+            assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
+        } else {
+            assertEquals(1, recordingAppender.getMessages().stream()
+                    .filter(msg -> msg.equals("INFO - Session shouldCloseSessionOnceOnRequest closed\n")).count());
+        }
     }
 
     @Test
@@ -308,9 +353,13 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
             cluster.close();
         }
 
-        // there will be one for the timeout and a second for closing the cluster
-        assertEquals(2, recordingAppender.getMessages().stream()
-                .filter(msg -> msg.equals("INFO - Session shouldHaveTheSessionTimeout closed\n")).count());
+        if (isUsingUnifiedChannelizer()) {
+            assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
+        } else {
+            // there will be one for the timeout and a second for closing the cluster
+            assertEquals(2, recordingAppender.getMessages().stream()
+                    .filter(msg -> msg.equals("INFO - Session shouldHaveTheSessionTimeout closed\n")).count());
+        }
     }
 
     @Test
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
index 166764b..f28969c 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
@@ -22,6 +22,7 @@ import org.apache.http.NoHttpResponseException;
 import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
 import org.apache.tinkerpop.gremlin.server.Settings;
 
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
 import org.junit.Test;
 import org.junit.Assert;
 
@@ -29,6 +30,9 @@ import java.util.Map;
 import java.util.HashMap;
 import java.net.SocketException;
 
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assume.assumeThat;
+
 public class HttpChannelizerIntegrateTest extends AbstractGremlinServerChannelizerIntegrateTest {
 
     @Override
@@ -74,6 +78,7 @@ public class HttpChannelizerIntegrateTest extends AbstractGremlinServerChanneliz
         final Settings.AuthenticationSettings authSettings = new Settings.AuthenticationSettings();
         final Map<String,Object> authConfig = new HashMap<>();
         authSettings.authenticator = SimpleAuthenticator.class.getName();
+        authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
         authConfig.put(SimpleAuthenticator.CONFIG_CREDENTIALS_DB, "conf/tinkergraph-credentials.properties");
         authSettings.config = authConfig;
 
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizerIntegrateTest.java
similarity index 58%
copy from gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
copy to gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizerIntegrateTest.java
index 166764b..a090a20 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizerIntegrateTest.java
@@ -18,55 +18,28 @@
  */
 package org.apache.tinkerpop.gremlin.server.channel;
 
-import org.apache.http.NoHttpResponseException;
-import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
 import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
 
-import org.junit.Test;
-import org.junit.Assert;
-
-import java.util.Map;
 import java.util.HashMap;
-import java.net.SocketException;
-
-public class HttpChannelizerIntegrateTest extends AbstractGremlinServerChannelizerIntegrateTest {
-
-    @Override
-    public Settings overrideSettings(final Settings settings) {
-        super.overrideSettings(settings);
-        final String nameOfTest = name.getMethodName();
-        if (nameOfTest.equals("shouldBreakOnInvalidAuthenticationHandler") ) {
-            settings.authentication = getAuthSettings();
-            settings.authentication.authenticationHandler = "Foo.class";
-        }
-        return settings;
-    }
+import java.util.Map;
 
-    @Test
-    public void shouldBreakOnInvalidAuthenticationHandler() throws Exception {
-        final CombinedTestClient client =  new CombinedTestClient(getProtocol());
-        try {
-            client.sendAndAssert("2+2", 4);
-            Assert.fail("An exception should be thrown with an invalid authentication handler");
-        } catch (NoHttpResponseException | SocketException e) {
-        } finally {
-            client.close();
-        }
-    }
+public class UnifiedChannelizerIntegrateTest extends AbstractGremlinServerChannelizerIntegrateTest {
 
     @Override
     public String getProtocol() {
-        return HTTP;
+        return WS_AND_HTTP;
     }
 
     @Override
     public String getSecureProtocol() {
-        return HTTPS;
+        return WSS_AND_HTTPS;
     }
 
     @Override
     public String getChannelizer() {
-        return HttpChannelizer.class.getName();
+        return UnifiedChannelizer.class.getName();
     }
 
     @Override
@@ -74,6 +47,7 @@ public class HttpChannelizerIntegrateTest extends AbstractGremlinServerChanneliz
         final Settings.AuthenticationSettings authSettings = new Settings.AuthenticationSettings();
         final Map<String,Object> authConfig = new HashMap<>();
         authSettings.authenticator = SimpleAuthenticator.class.getName();
+        authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
         authConfig.put(SimpleAuthenticator.CONFIG_CREDENTIALS_DB, "conf/tinkergraph-credentials.properties");
         authSettings.config = authConfig;
 
diff --git a/gremlin-tools/gremlin-benchmark/pom.xml b/gremlin-tools/gremlin-benchmark/pom.xml
index 688f44f..d0cdc0e 100644
--- a/gremlin-tools/gremlin-benchmark/pom.xml
+++ b/gremlin-tools/gremlin-benchmark/pom.xml
@@ -65,11 +65,6 @@ limitations under the License.
             <version>${project.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.tinkerpop</groupId>
-            <artifactId>tinkergraph-gremlin</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.openjdk.jmh</groupId>
             <artifactId>jmh-core</artifactId>
             <version>${jmh.version}</version>
diff --git a/hadoop-gremlin/pom.xml b/hadoop-gremlin/pom.xml
index 3e50091..d04d2e1 100644
--- a/hadoop-gremlin/pom.xml
+++ b/hadoop-gremlin/pom.xml
@@ -143,11 +143,6 @@ limitations under the License.
             <version>1.2</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-compress</artifactId>
-            <version>1.19</version>
-        </dependency>
-        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
             <version>16.0.1</version>