You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2021/04/05 16:58:43 UTC

[tinkerpop] 01/07: TINKERPOP-2245 Added UnifiedChannelizer

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2245
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 0fa8f84d31b9b47698f6a27db48dbf9ac6a0cd89
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Thu Mar 25 15:30:25 2021 -0400

    TINKERPOP-2245 Added UnifiedChannelizer
    
    The UnifiedChannelizer consolidates gremlin execution threadpools and unifies/streamlines server request processing between sessionless and sessionful requests.
---
 .travis.yml                                        |   4 +
 CHANGELOG.asciidoc                                 |   1 +
 docs/src/upgrade/release-3.5.x.asciidoc            |  11 +
 .../apache/tinkerpop/gremlin/driver/Handler.java   |   1 -
 .../gremlin/server/AbstractChannelizer.java        |  29 +-
 .../apache/tinkerpop/gremlin/server/Context.java   |  51 ++
 .../tinkerpop/gremlin/server/GremlinServer.java    |  13 +-
 .../apache/tinkerpop/gremlin/server/Settings.java  |  32 +
 .../gremlin/server/channel/HttpChannelizer.java    |   4 +-
 .../gremlin/server/channel/UnifiedChannelizer.java |  77 +++
 .../server/channel/WebSocketChannelizer.java       |  12 +-
 .../handler/AbstractAuthenticationHandler.java     |  12 +-
 .../gremlin/server/handler/AbstractRexster.java    | 719 +++++++++++++++++++++
 .../handler/HttpBasicAuthenticationHandler.java    |  11 +-
 .../gremlin/server/handler/MultiRexster.java       | 207 ++++++
 .../tinkerpop/gremlin/server/handler/Rexster.java  |  84 +++
 .../SaslAndHttpBasicAuthenticationHandler.java     |  25 +-
 .../server/handler/SaslAuthenticationHandler.java  |  11 +-
 .../gremlin/server/handler/SingleRexster.java      |  73 +++
 .../gremlin/server/handler/UnifiedHandler.java     | 283 ++++++++
 .../handler/WsAndHttpChannelizerHandler.java       |   6 +-
 .../AbstractGremlinServerIntegrationTest.java      |  31 +-
 .../tinkerpop/gremlin/server/ContextTest.java      |   3 +-
 .../gremlin/server/GremlinDriverIntegrateTest.java |  68 +-
 ...emlinServerAuditLogDeprecatedIntegrateTest.java |   5 +-
 .../server/GremlinServerAuditLogIntegrateTest.java |   4 +-
 .../server/GremlinServerAuthKrb5IntegrateTest.java |   2 +-
 .../server/GremlinServerAuthzIntegrateTest.java    |   3 +
 .../server/GremlinServerHttpIntegrateTest.java     |   4 +-
 .../gremlin/server/GremlinServerIntegrateTest.java |  30 +-
 .../server/GremlinServerSessionIntegrateTest.java  |  73 ++-
 .../channel/HttpChannelizerIntegrateTest.java      |   5 +
 ...t.java => UnifiedChannelizerIntegrateTest.java} |  42 +-
 gremlin-tools/gremlin-benchmark/pom.xml            |   5 -
 hadoop-gremlin/pom.xml                             |   5 -
 35 files changed, 1827 insertions(+), 119 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index a1456e7..ff3a058 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -60,6 +60,10 @@ jobs:
       name: "gremlin server"
     - script:
         - "mvn clean install -q -DskipTests -Dci"
+        - "travis_wait 60 mvn verify -pl :gremlin-server -DskipTests -DskipIntegrationTests=false -DincludeNeo4j -DtestUnified=true"
+      name: "gremlin server - unified"
+    - script:
+        - "mvn clean install -q -DskipTests -Dci"
         - "mvn verify -pl :gremlin-console -DskipTests -DskipIntegrationTests=false"
       name: "gremlin console"
     - script:
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index b15b0ab..041c278 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -30,6 +30,7 @@ This release also includes changes from <<release-3-4-3, 3.4.3>>.
 * Added a fully shaded version of `gremlin-driver`.
 * Exposed websocket connection status in JavaScript driver.
 * Fixed a bug where spark-gremlin was not re-attaching properties when using `dedup()`.
+* Fixed a bug in `WsAndHttpChannelizer` pipeline configuration where failed object aggregation could not write back HTTP responses.
 * Ensured better consistency of the use of `null` as arguments to mutation steps.
 * Added a `ResponseStatusCode` to indicate that a client should retry its request.
 * Added `TemporaryException` interface to indicate that a transaction can be retried.
diff --git a/docs/src/upgrade/release-3.5.x.asciidoc b/docs/src/upgrade/release-3.5.x.asciidoc
index 77a7ce0..3b99d58 100644
--- a/docs/src/upgrade/release-3.5.x.asciidoc
+++ b/docs/src/upgrade/release-3.5.x.asciidoc
@@ -410,6 +410,17 @@ these values are not hashable and will result in an error. By introducing a `Has
 See: link:https://issues.apache.org/jira/browse/TINKERPOP-2395[TINKERPOP-2395],
 link:https://issues.apache.org/jira/browse/TINKERPOP-2407[TINKERPOP-2407]
 
+==== Gremlin Server UnifiedChannelizer
+
+Just some notes for later:
+
+* UnifiedChannelizer technically replaces all existing implementations but is not yet the default
+* Some new settings related to it: maxParameters, sessionLifeTimeout, useGlobalFunctionCacheForSessions, useCommonEngineForSessions
+* Session behavior shifts slightly under this channelizer for async calls, where a failure will mean that the session
+will close, remaining requests in the queue will be ignored and rollback will occur.
+* care should be take with strict transaction management and multi-graph transactions (which aren't real - not a new thing)
+* absolute max lifetime of a session is a new thing
+
 ==== Gremlin Server Audit Logging
 
 The `authentication.enableAuditlog` configuration property is deprecated, but replaced by the `enableAuditLog` property
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index aa15597..11bf8b4 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -222,7 +222,6 @@ final class Handler {
             final ResultQueue queue = pending.get(response.getRequestId());
             if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.PARTIAL_CONTENT) {
                 final Object data = response.getResult().getData();
-                final Map<String,Object> meta = response.getResult().getMeta();
 
                 // this is a "result" from the server which is either the result of a script or a
                 // serialized traversal
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java
index 1e2287ea..ee02bd8 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java
@@ -50,6 +50,7 @@ import javax.net.ssl.TrustManagerFactory;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.Constructor;
 import java.security.KeyStore;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
@@ -98,6 +99,7 @@ public abstract class AbstractChannelizer extends ChannelInitializer<SocketChann
     public static final String PIPELINE_AUTHORIZER = "authorizer";
     public static final String PIPELINE_REQUEST_HANDLER = "request-handler";
     public static final String PIPELINE_HTTP_RESPONSE_ENCODER = "http-response-encoder";
+    public static final String PIPELINE_HTTP_AGGREGATOR = "http-aggregator";
     public static final String PIPELINE_WEBSOCKET_SERVER_COMPRESSION = "web-socket-server-compression-handler";
 
     protected static final String PIPELINE_SSL = "ssl";
@@ -182,10 +184,29 @@ public abstract class AbstractChannelizer extends ChannelInitializer<SocketChann
     protected AbstractAuthenticationHandler createAuthenticationHandler(final Settings settings) {
         try {
             final Class<?> clazz = Class.forName(settings.authentication.authenticationHandler);
-            final Class[] constructorArgs = new Class[2];
-            constructorArgs[0] = Authenticator.class;
-            constructorArgs[1] = Settings.class;
-            return (AbstractAuthenticationHandler) clazz.getDeclaredConstructor(constructorArgs).newInstance(authenticator, settings);
+            AbstractAuthenticationHandler aah;
+            try {
+                // the three arg constructor is the new form as a handler may need the authorizer in some cases
+                final Class<?>[] threeArgForm = new Class[]{Authenticator.class, Authorizer.class, Settings.class};
+                final Constructor<?> twoArgConstructor = clazz.getDeclaredConstructor(threeArgForm);
+                return (AbstractAuthenticationHandler) twoArgConstructor.newInstance(authenticator, authorizer, settings);
+            } catch (Exception threeArgEx) {
+                try {
+                    // the two arg constructor is the "old form" that existed prior to Authorizers. should probably
+                    // deprecate this form
+                    final Class<?>[] twoArgForm = new Class[]{Authenticator.class, Settings.class};
+                    final Constructor<?> twoArgConstructor = clazz.getDeclaredConstructor(twoArgForm);
+
+                    if (authorizer != null) {
+                        logger.warn("There is an authorizer configured but the {} does not have a constructor of ({}, {}, {}) so it cannot be added",
+                                clazz.getName(), Authenticator.class.getSimpleName(), Authorizer.class.getSimpleName(), Settings.class.getSimpleName());
+                    }
+
+                    return (AbstractAuthenticationHandler) twoArgConstructor.newInstance(authenticator, settings);
+                } catch (Exception twoArgEx) {
+                    throw twoArgEx;
+                }
+            }
         } catch (Exception ex) {
             logger.warn(ex.getMessage());
             throw new IllegalStateException(String.format("Could not create/configure AuthenticationHandler %s", settings.authentication.authenticationHandler), ex);
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
index 902a788..fcd2072 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
@@ -18,16 +18,21 @@
  */
 package org.apache.tinkerpop.gremlin.server;
 
+import org.apache.tinkerpop.gremlin.driver.Tokens;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
 import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
 import io.netty.channel.ChannelHandlerContext;
+import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinScriptChecker;
+import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
 import org.apache.tinkerpop.gremlin.server.handler.Frame;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -45,6 +50,11 @@ public class Context {
     private final GremlinExecutor gremlinExecutor;
     private final ScheduledExecutorService scheduledExecutorService;
     private final AtomicBoolean finalResponseWritten = new AtomicBoolean();
+    private final long requestTimeout;
+    private final RequestContentType requestContentType;
+    private final Object gremlinArgument;
+
+    public enum RequestContentType { BYTECODE, SCRIPT, UNKNOWN }
 
     public Context(final RequestMessage requestMessage, final ChannelHandlerContext ctx,
                    final Settings settings, final GraphManager graphManager,
@@ -55,6 +65,23 @@ public class Context {
         this.graphManager = graphManager;
         this.gremlinExecutor = gremlinExecutor;
         this.scheduledExecutorService = scheduledExecutorService;
+
+        // order of calls matter as one depends on the next
+        this.gremlinArgument = requestMessage.getArgs().get(Tokens.ARGS_GREMLIN);
+        this.requestContentType = determineRequestContents();
+        this.requestTimeout = determineTimeout();
+    }
+
+    public long getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    public RequestContentType getRequestContentType() {
+        return requestContentType;
+    }
+
+    public Object getGremlinArgument() {
+        return gremlinArgument;
     }
 
     public ScheduledExecutorService getScheduledExecutorService() {
@@ -133,4 +160,28 @@ public class Context {
         }
 
     }
+
+    private RequestContentType determineRequestContents() {
+        if (gremlinArgument instanceof Bytecode)
+            return RequestContentType.BYTECODE;
+        else if (gremlinArgument instanceof String)
+            return RequestContentType.SCRIPT;
+        else
+            return RequestContentType.UNKNOWN;
+    }
+
+    private long determineTimeout() {
+        // timeout override - handle both deprecated and newly named configuration. earlier logic should prevent
+        // both configurations from being submitted at the same time
+        final Map<String, Object> args = requestMessage.getArgs();
+        final long seto = args.containsKey(Tokens.ARGS_EVAL_TIMEOUT) ?
+                ((Number) args.get(Tokens.ARGS_EVAL_TIMEOUT)).longValue() : settings.getEvaluationTimeout();
+
+        // override the timeout if the lifecycle has a value assigned. if the script contains with(timeout)
+        // options then allow that value to override what's provided on the lifecycle
+        final Optional<Long> timeoutDefinedInScript = requestContentType == RequestContentType.SCRIPT ?
+                GremlinScriptChecker.parse(gremlinArgument.toString()).getTimeout() : Optional.empty();
+
+        return timeoutDefinedInScript.orElse(seto);
+    }
 }
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
index 2a1adf7..b322aa7 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
@@ -78,6 +78,7 @@ public class GremlinServer {
     private final ExecutorService gremlinExecutorService;
     private final ServerGremlinExecutor serverGremlinExecutor;
     private final boolean isEpollEnabled;
+    private Channelizer channelizer;
 
     /**
      * Construct a Gremlin Server instance from {@link Settings}.
@@ -159,7 +160,7 @@ public class GremlinServer {
                 }
             });
 
-            final Channelizer channelizer = createChannelizer(settings);
+            channelizer = createChannelizer(settings);
             channelizer.init(serverGremlinExecutor);
             b.group(bossGroup, workerGroup)
                     .childHandler(channelizer);
@@ -284,8 +285,10 @@ public class GremlinServer {
             }
 
             try {
-                if (gremlinExecutorService != null)
-                    gremlinExecutorService.awaitTermination(30000, TimeUnit.MILLISECONDS);
+                if (gremlinExecutorService != null) {
+                    if (!gremlinExecutorService.awaitTermination(30000, TimeUnit.MILLISECONDS))
+                        logger.warn("Gremlin thread pool did not fully terminate - continuing with shutdown process");
+                }
             } catch (InterruptedException ie) {
                 logger.warn("Timeout waiting for Gremlin thread pool to shutdown - continuing with shutdown process.");
             }
@@ -330,6 +333,10 @@ public class GremlinServer {
         return serverGremlinExecutor;
     }
 
+    public Channelizer getChannelizer() {
+        return channelizer;
+    }
+
     public static void main(final String[] args) throws Exception {
         // add to vm options: -Dlog4j.configuration=file:conf/log4j.properties
         printHeader();
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
index 3a535ab..3f84c19 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
 import org.apache.tinkerpop.gremlin.server.auth.AllowAllAuthenticator;
 import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
 import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
 import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
 import org.apache.tinkerpop.gremlin.server.handler.AbstractAuthenticationHandler;
 import org.apache.tinkerpop.gremlin.server.util.DefaultGraphManager;
@@ -197,6 +198,37 @@ public class Settings {
     public String graphManager = DefaultGraphManager.class.getName();
 
     /**
+     * Maximum number of parameters that can be passed on a request. Larger numbers may impact performance for scripts.
+     * The default is 16 and this setting only applies to the {@link UnifiedChannelizer}.
+     */
+    public int maxParameters = 16;
+
+    /**
+     * The time in milliseconds that a {@link UnifiedChannelizer} session can exist. This value cannot be extended
+     * beyond this value irrespective of the number of requests and their individual timeouts. Requests must complete
+     * within this time frame. The default is 10 minutes.
+     */
+    public long sessionLifetimeTimeout = 600000;
+
+    /**
+     * Enable the global function cache for sessions when using the {@link UnifiedChannelizer}. This setting is only
+     * relevant when {@link #useGlobalFunctionCacheForSessions} is {@code false}. When {@link true} it means that
+     * functions created in one request to a session remain available on the next request to that session.
+     */
+    public boolean useGlobalFunctionCacheForSessions = true;
+
+    /**
+     * When {@code true} and using the {@link UnifiedChannelizer} the same engine that will be used to server
+     * sessionless requests will also be use to serve sessions. The default value of {@code true} is recommended as
+     * it reduces the amount of object creation required for each session and should generally lead to better
+     * performance especially if the expectation is that there will be many sessions being created and destroyed
+     * rapidly. Setting this value to {@code false} is mostly present to support specific use cases that may require
+     * each session having its own engine or to match previous functionality provided by the older channelizers
+     * produced prior to 3.5.0.
+     */
+    public boolean useCommonEngineForSessions = true;
+
+    /**
      * Configured metrics for Gremlin Server.
      */
     public ServerMetrics metrics = null;
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
index 7b3ad99..be2a7af 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
@@ -62,7 +62,9 @@ public class HttpChannelizer extends AbstractChannelizer {
         if (logger.isDebugEnabled())
             pipeline.addLast(new LoggingHandler("http-io", LogLevel.DEBUG));
 
-        pipeline.addLast(new HttpObjectAggregator(settings.maxContentLength));
+        final HttpObjectAggregator aggregator = new HttpObjectAggregator(settings.maxContentLength);
+        aggregator.setMaxCumulationBufferComponents(settings.maxAccumulationBufferComponents);
+        pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, aggregator);
 
         if (authenticator != null) {
             // Cannot add the same handler instance multiple times unless
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizer.java
new file mode 100644
index 0000000..522baa2
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.channel;
+
+import io.netty.channel.ChannelPipeline;
+import org.apache.tinkerpop.gremlin.server.AbstractChannelizer;
+import org.apache.tinkerpop.gremlin.server.Channelizer;
+import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler;
+import org.apache.tinkerpop.gremlin.server.handler.UnifiedHandler;
+import org.apache.tinkerpop.gremlin.server.handler.WsAndHttpChannelizerHandler;
+import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
+
+/**
+ * A {@link Channelizer} that supports websocket and HTTP requests and does so with the most streamlined processing
+ * model for Gremlin Server introduced with 3.5.0.
+ */
+public class UnifiedChannelizer extends AbstractChannelizer {
+
+    private WsAndHttpChannelizerHandler handler;
+    private UnifiedHandler unifiedHandler;
+    protected static final String PIPELINE_UNIFIED = "unified";
+
+    @Override
+    public void init(final ServerGremlinExecutor serverGremlinExecutor) {
+        super.init(serverGremlinExecutor);
+        handler = new WsAndHttpChannelizerHandler();
+        handler.init(serverGremlinExecutor, new HttpGremlinEndpointHandler(serializers, gremlinExecutor, graphManager, settings));
+
+        // these handlers don't share any state and can thus be initialized once per pipeline
+        unifiedHandler = new UnifiedHandler(settings, graphManager, gremlinExecutor, scheduledExecutorService, this);
+    }
+
+    @Override
+    public void configure(final ChannelPipeline pipeline) {
+        handler.configure(pipeline);
+        pipeline.addAfter(PIPELINE_HTTP_REQUEST_DECODER, "WsAndHttpChannelizerHandler", handler);
+    }
+
+    @Override
+    public void finalize(final ChannelPipeline pipeline) {
+        super.finalize(pipeline);
+        pipeline.remove(PIPELINE_OP_SELECTOR);
+        pipeline.remove(PIPELINE_OP_EXECUTOR);
+
+        pipeline.addLast(PIPELINE_UNIFIED, unifiedHandler);
+    }
+
+    public UnifiedHandler getUnifiedHandler() {
+        return unifiedHandler;
+    }
+
+    @Override
+    public boolean supportsIdleMonitor() {
+        return true;
+    }
+
+    @Override
+    public Object createIdleDetectionMessage() {
+        return handler.getWsChannelizer().createIdleDetectionMessage();
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
index c4ff402..a1864fd 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
@@ -81,8 +81,11 @@ public class WebSocketChannelizer extends AbstractChannelizer {
 
     @Override
     public void configure(final ChannelPipeline pipeline) {
+
         if (logger.isDebugEnabled())
-            pipeline.addLast(new LoggingHandler("log-io", LogLevel.DEBUG));
+            pipeline.addLast(new LoggingHandler("log-encoder-aggregator", LogLevel.DEBUG));
+
+        pipeline.addLast(PIPELINE_HTTP_RESPONSE_ENCODER, new HttpResponseEncoder());
 
         logger.debug("HttpRequestDecoder settings - maxInitialLineLength={}, maxHeaderSize={}, maxChunkSize={}",
                 settings.maxInitialLineLength, settings.maxHeaderSize, settings.maxChunkSize);
@@ -95,12 +98,7 @@ public class WebSocketChannelizer extends AbstractChannelizer {
                 settings.maxContentLength, settings.maxAccumulationBufferComponents);
         final HttpObjectAggregator aggregator = new HttpObjectAggregator(settings.maxContentLength);
         aggregator.setMaxCumulationBufferComponents(settings.maxAccumulationBufferComponents);
-        pipeline.addLast("aggregator", aggregator);
-
-        if (logger.isDebugEnabled())
-            pipeline.addLast(new LoggingHandler("log-aggregator-encoder", LogLevel.DEBUG));
-
-        pipeline.addLast(PIPELINE_HTTP_RESPONSE_ENCODER, new HttpResponseEncoder());
+        pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, aggregator);
         // Add compression extension for WebSocket defined in https://tools.ietf.org/html/rfc7692
         pipeline.addLast(PIPELINE_WEBSOCKET_SERVER_COMPRESSION, new WebSocketServerCompressionHandler());
 
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractAuthenticationHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractAuthenticationHandler.java
index 026ad59..074e4ab 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractAuthenticationHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractAuthenticationHandler.java
@@ -21,15 +21,25 @@ package org.apache.tinkerpop.gremlin.server.handler;
 import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
 
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
 
 /**
  * Provides an abstraction point to allow for http auth schemes beyond basic auth.
  */
 public abstract class AbstractAuthenticationHandler extends ChannelInboundHandlerAdapter {
     protected final Authenticator authenticator;
+    protected final Authorizer authorizer;
 
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #AbstractAuthenticationHandler(Authenticator, Authorizer)}.
+     */
+    @Deprecated
     public AbstractAuthenticationHandler(final Authenticator authenticator) {
-        this.authenticator = authenticator;
+        this(authenticator, null);
     }
 
+    public AbstractAuthenticationHandler(final Authenticator authenticator, final Authorizer authorizer) {
+        this.authenticator = authenticator;
+        this.authorizer = authorizer;
+    }
 }
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java
new file mode 100644
index 0000000..7f7c51c
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java
@@ -0,0 +1,719 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import groovy.lang.GroovyRuntimeException;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
+import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer;
+import org.apache.tinkerpop.gremlin.groovy.jsr223.TimedInterruptTimeoutException;
+import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptEngine;
+import org.apache.tinkerpop.gremlin.jsr223.JavaTranslator;
+import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
+import org.apache.tinkerpop.gremlin.process.traversal.util.BytecodeHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
+import org.apache.tinkerpop.gremlin.server.Context;
+import org.apache.tinkerpop.gremlin.server.GraphManager;
+import org.apache.tinkerpop.gremlin.server.GremlinServer;
+import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
+import org.apache.tinkerpop.gremlin.server.util.ExceptionHelper;
+import org.apache.tinkerpop.gremlin.server.util.TraverserIterator;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.apache.tinkerpop.gremlin.structure.util.TemporaryException;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.codehaus.groovy.control.MultipleCompilationErrorsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.script.Bindings;
+import javax.script.ScriptException;
+import javax.script.SimpleBindings;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+/**
+ * A base implementation of {@link Rexster} which offers some common functionality that matches typical Gremlin Server
+ * request response expectations for script, bytecode and graph operations. The class is designed to be extended but
+ * take care in understanding the way that different methods are called as they do depend on one another a bit. It
+ * maybe best to examine the source code to determine how best to use this class or to extend from the higher order
+ * classes of {@link SingleRexster} or {@link MultiRexster}.
+ */
+public abstract class AbstractRexster implements Rexster, AutoCloseable {
+    private static final Logger logger = LoggerFactory.getLogger(AbstractRexster.class);
+    private static final Logger auditLogger = LoggerFactory.getLogger(GremlinServer.AUDIT_LOGGER_NAME);
+
+    private final Channel initialChannel;
+    private final boolean transactionManaged;
+    private final String sessionId;
+    private final AtomicReference<ScheduledFuture<?>> sessionCancelFuture = new AtomicReference<>();
+    private final AtomicReference<Future<?>> sessionFuture = new AtomicReference<>();
+    private long actualTimeoutLength = 0;
+    private boolean actualTimeoutCausedBySession = false;
+    protected final GraphManager graphManager;
+    protected final ConcurrentMap<String, Rexster> sessions;
+    protected final Set<String> aliasesUsedByRexster = new HashSet<>();
+
+    AbstractRexster(final Context gremlinContext, final String sessionId,
+                    final boolean transactionManaged, final ConcurrentMap<String, Rexster> sessions) {
+        this.transactionManaged = transactionManaged;
+        this.sessionId = sessionId;
+        this.initialChannel = gremlinContext.getChannelHandlerContext().channel();
+
+        // close Rexster if the channel closes to cleanup and close transactions
+        this.initialChannel.closeFuture().addListener(f -> {
+            // cancel session worker or it will keep waiting for items to appear in the session queue
+            final Future<?> sf = sessionFuture.get();
+            if (sf != null && !sf.isDone()) {
+                sf.cancel(true);
+            }
+            close();
+        });
+        this.sessions = sessions;
+        this.graphManager = gremlinContext.getGraphManager();
+    }
+
+    public boolean isTransactionManaged() {
+        return transactionManaged;
+    }
+
+    public String getSessionId() {
+        return sessionId;
+    }
+
+    public boolean isBoundTo(final Channel channel) {
+        return channel == initialChannel;
+    }
+
+    public long getActualTimeoutLength() {
+        return actualTimeoutLength;
+    }
+
+    public boolean isActualTimeoutCausedBySession() {
+        return actualTimeoutCausedBySession;
+    }
+
+    public GremlinScriptEngine getScriptEngine(final Context context, final String language) {
+        return context.getGremlinExecutor().getScriptEngineManager().getEngineByName(language);
+    }
+
+    @Override
+    public void setSessionCancelFuture(final ScheduledFuture<?> f) {
+        if (!sessionCancelFuture.compareAndSet(null, f))
+            throw new IllegalStateException("Session cancellation future is already set");
+    }
+
+    @Override
+    public void setSessionFuture(final Future<?> f) {
+        if (!sessionFuture.compareAndSet(null, f))
+            throw new IllegalStateException("Session future is already set");
+    }
+
+    @Override
+    public synchronized void triggerTimeout(final long timeout, final boolean causedBySession) {
+        // triggering timeout triggers the stop of the Rexster Runnable which will end in close()
+        // for final cleanup
+        final Future<?> f = sessionFuture.get();
+        if (f != null && !f.isDone()) {
+            actualTimeoutCausedBySession = causedBySession;
+            actualTimeoutLength = timeout;
+            sessionFuture.get().cancel(true);
+        }
+    }
+
+    protected void process(final Context gremlinContext) throws RexsterException {
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+        final Map<String, Object> args = msg.getArgs();
+        final Object gremlinToExecute = args.get(Tokens.ARGS_GREMLIN);
+
+        // for strict transactions track the aliases used so that we can commit them and only them on close()
+        if (gremlinContext.getSettings().strictTransactionManagement)
+            msg.optionalArgs(Tokens.ARGS_ALIASES).ifPresent(m -> aliasesUsedByRexster.addAll(((Map<String,String>) m).values()));
+
+        try {
+            // itty is optional as Bytecode could be a "graph operation" rather than a Traversal. graph operations
+            // don't need to be iterated and handle their own lifecycle
+            final Optional<Iterator<?>> itty = gremlinToExecute instanceof Bytecode ?
+                    fromBytecode(gremlinContext, (Bytecode) gremlinToExecute) :
+                    Optional.of(fromScript(gremlinContext, (String) gremlinToExecute));
+
+            processAuditLog(gremlinContext.getSettings(), gremlinContext.getChannelHandlerContext(), gremlinToExecute);
+
+            if (itty.isPresent())
+                handleIterator(gremlinContext, itty.get());
+        } catch (Exception ex) {
+            handleException(gremlinContext, ex);
+        }
+    }
+
+    protected void handleException(final Context gremlinContext, final Throwable t) throws RexsterException {
+        if (t instanceof RexsterException) throw (RexsterException) t;
+
+        final Optional<Throwable> possibleTemporaryException = determineIfTemporaryException(t);
+        if (possibleTemporaryException.isPresent()) {
+            final Throwable temporaryException = possibleTemporaryException.get();
+            throw new RexsterException(temporaryException.getMessage(), t,
+                    ResponseMessage.build(gremlinContext.getRequestMessage())
+                            .code(ResponseStatusCode.SERVER_ERROR_TEMPORARY)
+                            .statusMessage(temporaryException.getMessage())
+                            .statusAttributeException(temporaryException).create());
+        }
+
+        final Throwable root = ExceptionUtils.getRootCause(t);
+
+        if (root instanceof TimedInterruptTimeoutException) {
+            // occurs when the TimedInterruptCustomizerProvider is in play
+            final String msg = String.format("A timeout occurred within the script during evaluation of [%s] - consider increasing the limit given to TimedInterruptCustomizerProvider",
+                    gremlinContext.getRequestMessage().getRequestId());
+            throw new RexsterException(msg, root, ResponseMessage.build(gremlinContext.getRequestMessage())
+                    .code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+                    .statusMessage("Timeout during script evaluation triggered by TimedInterruptCustomizerProvider")
+                    .create());
+        }
+
+        if (root instanceof TimeoutException) {
+            final String errorMessage = String.format("Script evaluation exceeded the configured threshold for request [%s]",
+                    gremlinContext.getRequestMessage().getRequestId());
+            throw new RexsterException(errorMessage, root, ResponseMessage.build(gremlinContext.getRequestMessage())
+                    .code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+                    .statusMessage(t.getMessage())
+                    .create());
+        }
+
+        if (root instanceof InterruptedException ||
+                root instanceof TraversalInterruptedException ||
+                root instanceof InterruptedIOException) {
+            final String msg = actualTimeoutCausedBySession ?
+                    String.format("Session closed - %s - sessionLifetimeTimeout of %s ms exceeded", sessionId, actualTimeoutLength) :
+                    String.format("Evaluation exceeded timeout threshold of %s ms", actualTimeoutLength);
+            throw new RexsterException(msg, root, ResponseMessage.build(gremlinContext.getRequestMessage())
+                    .code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+                    .statusMessage(msg).create());
+        }
+
+        if (root instanceof MultipleCompilationErrorsException && root.getMessage().contains("Method too large") &&
+                ((MultipleCompilationErrorsException) root).getErrorCollector().getErrorCount() == 1) {
+            final String errorMessage = String.format("The Gremlin statement that was submitted exceeds the maximum compilation size allowed by the JVM, please split it into multiple smaller statements - %s", trimMessage(gremlinContext.getRequestMessage()));
+            logger.warn(errorMessage);
+            throw new RexsterException(errorMessage, root, ResponseMessage.build(gremlinContext.getRequestMessage())
+                    .code(ResponseStatusCode.SERVER_ERROR_EVALUATION)
+                    .statusMessage(errorMessage)
+                    .statusAttributeException(root).create());
+        }
+
+        // GroovyRuntimeException will hit a pretty wide range of eval type errors, like MissingPropertyException,
+        // CompilationFailedException, MissingMethodException, etc. If more specific handling is required then
+        // try to catch it earlier above.
+        if (root instanceof GroovyRuntimeException ||
+                root instanceof VerificationException ||
+                root instanceof ScriptException) {
+            throw new RexsterException(root.getMessage(), root, ResponseMessage.build(gremlinContext.getRequestMessage())
+                    .code(ResponseStatusCode.SERVER_ERROR_EVALUATION)
+                    .statusMessage(root.getMessage())
+                    .statusAttributeException(root).create());
+        }
+
+        throw new RexsterException(root.getClass().getSimpleName() + ": " + root.getMessage(), root,
+                ResponseMessage.build(gremlinContext.getRequestMessage())
+                        .code(ResponseStatusCode.SERVER_ERROR)
+                        .statusAttributeException(root)
+                        .statusMessage(root.getMessage()).create());
+    }
+
+    /**
+     * Used to decrease the size of a Gremlin script that triggered a "method too large" exception so that it
+     * doesn't log a massive text string nor return a large error message.
+     */
+    private RequestMessage trimMessage(final RequestMessage msg) {
+        final RequestMessage trimmedMsg = RequestMessage.from(msg).create();
+        if (trimmedMsg.getArgs().containsKey(Tokens.ARGS_GREMLIN))
+            trimmedMsg.getArgs().put(Tokens.ARGS_GREMLIN, trimmedMsg.getArgs().get(Tokens.ARGS_GREMLIN).toString().substring(0, 1021) + "...");
+
+        return trimmedMsg;
+    }
+
+    /**
+     * Check if any exception in the chain is TemporaryException then we should respond with the right error code so
+     * that the client knows to retry.
+     */
+    protected Optional<Throwable> determineIfTemporaryException(final Throwable ex) {
+        return Stream.of(ExceptionUtils.getThrowables(ex)).
+                filter(i -> i instanceof TemporaryException).findFirst();
+    }
+
+    @Override
+    public synchronized void close() {
+        // already closing/closed
+        if (!sessions.containsKey(sessionId)) return;
+
+        sessions.remove(sessionId);
+
+        if (sessionCancelFuture.get() != null) {
+            final ScheduledFuture<?> f = sessionCancelFuture.get();
+            if (!f.isDone()) f.cancel(true);
+        }
+    }
+
+    protected Iterator<?> fromScript(final Context gremlinContext, final String script) throws Exception {
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+        final Map<String, Object> args = msg.getArgs();
+        final String language = args.containsKey(Tokens.ARGS_LANGUAGE) ? (String) args.get(Tokens.ARGS_LANGUAGE) : "gremlin-groovy";
+        return IteratorUtils.asIterator(getScriptEngine(gremlinContext, language).eval(
+                script, mergeBindingsFromRequest(gremlinContext, getWorkerBindings())));
+    }
+
+    protected Optional<Iterator<?>> fromBytecode(final Context gremlinContext, final Bytecode bytecode) throws Exception {
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+
+        final Traversal.Admin<?, ?> traversal;
+        final Map<String, String> aliases = (Map<String, String>) msg.optionalArgs(Tokens.ARGS_ALIASES).get();
+        final GraphManager graphManager = gremlinContext.getGraphManager();
+        final String traversalSourceName = aliases.entrySet().iterator().next().getValue();
+        final TraversalSource g = graphManager.getTraversalSource(traversalSourceName);
+
+        // handle bytecode based graph operations like commit/rollback commands
+        if (BytecodeHelper.isGraphOperation(bytecode)) {
+            handleGraphOperation(gremlinContext, bytecode, g.getGraph());
+            return Optional.empty();
+        } else {
+
+            final Optional<String> lambdaLanguage = BytecodeHelper.getLambdaLanguage(bytecode);
+            if (!lambdaLanguage.isPresent())
+                traversal = JavaTranslator.of(g).translate(bytecode);
+            else {
+                final SimpleBindings bindings = new SimpleBindings();
+                bindings.put(traversalSourceName, g);
+                traversal = gremlinContext.getGremlinExecutor().getScriptEngineManager().getEngineByName(lambdaLanguage.get()).eval(bytecode, bindings, traversalSourceName);
+            }
+
+            // compile the traversal - without it getEndStep() has nothing in it
+            traversal.applyStrategies();
+
+            return Optional.of(new TraverserIterator(traversal));
+        }
+    }
+
+    protected Bindings getWorkerBindings() throws RexsterException {
+        return new SimpleBindings(graphManager.getAsBindings());
+    }
+
+    protected Bindings mergeBindingsFromRequest(final Context gremlinContext, final Bindings bindings) throws RexsterException {
+        // alias any global bindings to a different variable.
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+        if (msg.getArgs().containsKey(Tokens.ARGS_ALIASES)) {
+            final Map<String, String> aliases = (Map<String, String>) msg.getArgs().get(Tokens.ARGS_ALIASES);
+            for (Map.Entry<String,String> aliasKv : aliases.entrySet()) {
+                boolean found = false;
+
+                // first check if the alias refers to a Graph instance
+                final Graph graph = gremlinContext.getGraphManager().getGraph(aliasKv.getValue());
+                if (null != graph) {
+                    bindings.put(aliasKv.getKey(), graph);
+                    found = true;
+                }
+
+                // if the alias wasn't found as a Graph then perhaps it is a TraversalSource - it needs to be
+                // something
+                if (!found) {
+                    final TraversalSource ts = gremlinContext.getGraphManager().getTraversalSource(aliasKv.getValue());
+                    if (null != ts) {
+                        bindings.put(aliasKv.getKey(), ts);
+                        found = true;
+                    }
+                }
+
+                // this validation is important to calls to GraphManager.commit() and rollback() as they both
+                // expect that the aliases supplied are valid
+                if (!found) {
+                    final String error = String.format("Could not alias [%s] to [%s] as [%s] not in the Graph or TraversalSource global bindings",
+                            aliasKv.getKey(), aliasKv.getValue(), aliasKv.getValue());
+                    throw new RexsterException(error, ResponseMessage.build(msg)
+                            .code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(error).create());
+                }
+            }
+        } else {
+            // there's no bindings so determine if that's ok with Gremlin Server
+            if (gremlinContext.getSettings().strictTransactionManagement) {
+                final String error = "Gremlin Server is configured with strictTransactionManagement as 'true' - the 'aliases' arguments must be provided";
+                throw new RexsterException(error, ResponseMessage.build(msg)
+                        .code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(error).create());
+            }
+        }
+
+        // add any bindings to override any other supplied
+        Optional.ofNullable((Map<String, Object>) msg.getArgs().get(Tokens.ARGS_BINDINGS)).ifPresent(bindings::putAll);
+        return bindings;
+    }
+
+    /**
+     * Provides a generic way of iterating a result set back to the client.
+     *
+     * @param gremlinContext The Gremlin Server {@link Context} object containing settings, request message, etc.
+     * @param itty The result to iterator
+     */
+    protected void handleIterator(final Context gremlinContext, final Iterator<?> itty) throws InterruptedException {
+        final ChannelHandlerContext nettyContext = gremlinContext.getChannelHandlerContext();
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+        final Settings settings = gremlinContext.getSettings();
+        boolean warnOnce = false;
+
+        // sessionless requests are always transaction managed, but in-session requests are configurable.
+        final boolean managedTransactionsForRequest = transactionManaged ?
+                true : (Boolean) msg.getArgs().getOrDefault(Tokens.ARGS_MANAGE_TRANSACTION, false);
+
+        // we have an empty iterator - happens on stuff like: g.V().iterate()
+        if (!itty.hasNext()) {
+            final Map<String, Object> attributes = generateStatusAttributes(gremlinContext,ResponseStatusCode.NO_CONTENT, itty);
+            // as there is nothing left to iterate if we are transaction managed then we should execute a
+            // commit here before we send back a NO_CONTENT which implies success
+            if (managedTransactionsForRequest)
+                closeTransaction(gremlinContext, Transaction.Status.COMMIT);
+
+            gremlinContext.writeAndFlush(ResponseMessage.build(msg)
+                    .code(ResponseStatusCode.NO_CONTENT)
+                    .statusAttributes(attributes)
+                    .create());
+            return;
+        }
+
+        // the batch size can be overridden by the request
+        final int resultIterationBatchSize = (Integer) msg.optionalArgs(Tokens.ARGS_BATCH_SIZE)
+                .orElse(settings.resultIterationBatchSize);
+        List<Object> aggregate = new ArrayList<>(resultIterationBatchSize);
+
+        // use an external control to manage the loop as opposed to just checking hasNext() in the while.  this
+        // prevent situations where auto transactions create a new transaction after calls to commit() withing
+        // the loop on calls to hasNext().
+        boolean hasMore = itty.hasNext();
+
+        while (hasMore) {
+            if (Thread.interrupted()) throw new InterruptedException();
+
+            // check if an implementation needs to force flush the aggregated results before the iteration batch
+            // size is reached.
+            // todo: what implementation does this?! can we kill it going forward - seems always false
+            // final boolean forceFlush = isForceFlushed(nettyContext, msg, itty);
+            final boolean forceFlush = false;
+
+            // have to check the aggregate size because it is possible that the channel is not writeable (below)
+            // so iterating next() if the message is not written and flushed would bump the aggregate size beyond
+            // the expected resultIterationBatchSize.  Total serialization time for the response remains in
+            // effect so if the client is "slow" it may simply timeout.
+            //
+            // there is a need to check hasNext() on the iterator because if the channel is not writeable the
+            // previous pass through the while loop will have next()'d the iterator and if it is "done" then a
+            // NoSuchElementException will raise its head. also need a check to ensure that this iteration doesn't
+            // require a forced flush which can be forced by sub-classes.
+            //
+            // this could be placed inside the isWriteable() portion of the if-then below but it seems better to
+            // allow iteration to continue into a batch if that is possible rather than just doing nothing at all
+            // while waiting for the client to catch up
+            if (aggregate.size() < resultIterationBatchSize && itty.hasNext() && !forceFlush) aggregate.add(itty.next());
+
+            // Don't keep executor busy if client has already given up; there is no way to catch up if the channel is
+            // not active, and hence we should break the loop.
+            if (!nettyContext.channel().isActive()) {
+                if (managedTransactionsForRequest) {
+                    closeTransaction(gremlinContext, Transaction.Status.ROLLBACK);
+                }
+                break;
+            }
+
+            // send back a page of results if batch size is met or if it's the end of the results being iterated.
+            // also check writeability of the channel to prevent OOME for slow clients.
+            //
+            // clients might decide to close the Netty channel to the server with a CloseWebsocketFrame after errors
+            // like CorruptedFrameException. On the server, although the channel gets closed, there might be some
+            // executor threads waiting for watermark to clear which will not clear in these cases since client has
+            // already given up on these requests. This leads to these executors waiting for the client to consume
+            // results till the timeout. checking for isActive() should help prevent that.
+            if (nettyContext.channel().isActive() && nettyContext.channel().isWritable()) {
+                if (forceFlush || aggregate.size() == resultIterationBatchSize || !itty.hasNext()) {
+                    final ResponseStatusCode code = itty.hasNext() ? ResponseStatusCode.PARTIAL_CONTENT : ResponseStatusCode.SUCCESS;
+                    Frame frame = null;
+                    try {
+                        frame = makeFrame(gremlinContext, aggregate, code, itty);
+                    } catch (Exception ex) {
+                        // a frame may use a Bytebuf which is a countable release - if it does not get written
+                        // downstream it needs to be released here
+                        if (frame != null) frame.tryRelease();
+
+                        // exception is handled in makeFrame() - serialization error gets written back to driver
+                        // at that point
+                        if (managedTransactionsForRequest)
+                            closeTransaction(gremlinContext, Transaction.Status.ROLLBACK);
+                        break;
+                    }
+
+                    // track whether there is anything left in the iterator because it needs to be accessed after
+                    // the transaction could be closed - in that case a call to hasNext() could open a new transaction
+                    // unintentionally
+                    final boolean moreInIterator = itty.hasNext();
+
+                    try {
+                        // only need to reset the aggregation list if there's more stuff to write
+                        if (moreInIterator)
+                            aggregate = new ArrayList<>(resultIterationBatchSize);
+                        else {
+                            // iteration and serialization are both complete which means this finished successfully. note that
+                            // errors internal to script eval or timeout will rollback given GremlinServer's global configurations.
+                            // local errors will get rolledback below because the exceptions aren't thrown in those cases to be
+                            // caught by the GremlinExecutor for global rollback logic. this only needs to be committed if
+                            // there are no more items to iterate and serialization is complete
+                            if (managedTransactionsForRequest)
+                                closeTransaction(gremlinContext, Transaction.Status.COMMIT);
+
+                            // exit the result iteration loop as there are no more results left.  using this external control
+                            // because of the above commit.  some graphs may open a new transaction on the call to
+                            // hasNext()
+                            hasMore = false;
+                        }
+                    } catch (Exception ex) {
+                        // a frame may use a Bytebuf which is a countable release - if it does not get written
+                        // downstream it needs to be released here
+                        if (frame != null) frame.tryRelease();
+                        throw ex;
+                    }
+
+                    if (!moreInIterator) iterateComplete(gremlinContext, itty);
+
+                    // the flush is called after the commit has potentially occurred.  in this way, if a commit was
+                    // required then it will be 100% complete before the client receives it. the "frame" at this point
+                    // should have completely detached objects from the transaction (i.e. serialization has occurred)
+                    // so a new one should not be opened on the flush down the netty pipeline
+                    gremlinContext.writeAndFlush(code, frame);
+                }
+            } else {
+                // don't keep triggering this warning over and over again for the same request
+                if (!warnOnce) {
+                    logger.warn("Pausing response writing as writeBufferHighWaterMark exceeded on {} - writing will continue once client has caught up", msg);
+                    warnOnce = true;
+                }
+
+                // since the client is lagging we can hold here for a period of time for the client to catch up.
+                // this isn't blocking the IO thread - just a worker.
+                TimeUnit.MILLISECONDS.sleep(10);
+            }
+        }
+    }
+
+    /**
+     * If {@link Bytecode} is detected to contain a "graph operation" then it gets processed by this method.
+     */
+    protected void handleGraphOperation(final Context gremlinContext, final Bytecode bytecode, final Graph graph) throws Exception {
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+        if (graph.features().graph().supportsTransactions()) {
+            if (bytecode.equals(Bytecode.TX_COMMIT) || bytecode.equals(Bytecode.TX_ROLLBACK)) {
+                final boolean commit = bytecode.equals(Bytecode.TX_COMMIT);
+                closeTransaction(gremlinContext, commit ? Transaction.Status.COMMIT : Transaction.Status.ROLLBACK);
+
+                // write back a no-op for success
+                final Map<String, Object> attributes = generateStatusAttributes(gremlinContext,
+                        ResponseStatusCode.NO_CONTENT, Collections.emptyIterator());
+                gremlinContext.writeAndFlush(ResponseMessage.build(msg)
+                            .code(ResponseStatusCode.NO_CONTENT)
+                            .statusAttributes(attributes)
+                            .create());
+            } else {
+                throw new IllegalStateException(String.format(
+                        "Bytecode in request is not a recognized graph operation: %s", bytecode.toString()));
+            }
+        }
+    }
+
+    /**
+     * Called when iteration within {@link #handleIterator(Context, Iterator)} is on its final pass and the final
+     * frame is about to be sent back to the client. This method only gets called on successful iteration of the
+     * entire result.
+     */
+    protected void iterateComplete(final Context gremlinContext, final Iterator<?> itty) {
+        // do nothing by default
+    }
+
+    /**
+     * Generates response status meta-data to put on a {@link ResponseMessage}.
+     *
+     * @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in
+     *             this method
+     */
+    protected Map<String, Object> generateStatusAttributes(final Context gremlinContext,
+                                                           final ResponseStatusCode code, final Iterator<?> itty) {
+        // only return server metadata on the last message
+        if (itty.hasNext()) return Collections.emptyMap();
+
+        final Map<String, Object> metaData = new HashMap<>();
+        metaData.put(Tokens.ARGS_HOST, gremlinContext.getChannelHandlerContext().channel().remoteAddress().toString());
+
+        return metaData;
+    }
+
+    /**
+     * Generates response result meta-data to put on a {@link ResponseMessage}.
+     *
+     * @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in
+     *             this method
+     */
+    protected Map<String, Object> generateResponseMetaData(final Context gremlinContext,
+                                                           final ResponseStatusCode code, final Iterator<?> itty) {
+        return Collections.emptyMap();
+    }
+
+    protected Frame makeFrame(final Context gremlinContext, final List<Object> aggregate,
+                              final ResponseStatusCode code, final Iterator<?> itty) throws Exception {
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+        final ChannelHandlerContext nettyContext = gremlinContext.getChannelHandlerContext();
+        final MessageSerializer serializer = nettyContext.channel().attr(StateKey.SERIALIZER).get();
+        final boolean useBinary = nettyContext.channel().attr(StateKey.USE_BINARY).get();
+
+        final Map<String, Object> responseMetaData = generateResponseMetaData(gremlinContext, code, itty);
+        final Map<String, Object> statusAttributes = generateStatusAttributes(gremlinContext, code, itty);
+        try {
+            if (useBinary) {
+                return new Frame(serializer.serializeResponseAsBinary(ResponseMessage.build(msg)
+                        .code(code)
+                        .statusAttributes(statusAttributes)
+                        .responseMetaData(responseMetaData)
+                        .result(aggregate).create(), nettyContext.alloc()));
+            } else {
+                // the expectation is that the GremlinTextRequestDecoder will have placed a MessageTextSerializer
+                // instance on the channel.
+                final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
+                return new Frame(textSerializer.serializeResponseAsString(ResponseMessage.build(msg)
+                        .code(code)
+                        .statusAttributes(statusAttributes)
+                        .responseMetaData(responseMetaData)
+                        .result(aggregate).create()));
+            }
+        } catch (Exception ex) {
+            logger.warn("The result [{}] in the request {} could not be serialized and returned.", aggregate, msg.getRequestId(), ex);
+            final String errorMessage = String.format("Error during serialization: %s", ExceptionHelper.getMessageFromExceptionOrCause(ex));
+            final ResponseMessage error = ResponseMessage.build(msg.getRequestId())
+                    .statusMessage(errorMessage)
+                    .statusAttributeException(ex)
+                    .code(ResponseStatusCode.SERVER_ERROR_SERIALIZATION).create();
+            gremlinContext.writeAndFlush(error);
+            throw ex;
+        }
+    }
+
+    /**
+     * Called right before a transaction starts within {@link #run()}.
+     */
+    protected void startTransaction(final Context gremlinContext) {
+        // check if transactions are open and rollback first to ensure a fresh start.
+        graphManager.rollbackAll();
+    }
+
+    /**
+     * Close the transaction without a {@link Context} which supplies {@code null} to that argument for
+     * {@link #closeTransaction(Context, Transaction.Status)}. This method is idempotent.
+     */
+    protected void closeTransaction(final Transaction.Status status) {
+        closeTransaction(null, status);
+    }
+
+    /**
+     * Tries to close the transaction but will catch exceptions and log them. This method is idempotent.
+     */
+    protected void closeTransactionSafely(final Transaction.Status status) {
+        closeTransactionSafely(null, status);
+    }
+
+    /**
+     * Tries to close the transaction but will catch exceptions and log them. This method is idempotent.
+     */
+    protected void closeTransactionSafely(final Context gremlinContext, final Transaction.Status status) {
+        try {
+            closeTransaction(gremlinContext, status);
+        } catch (Exception ex) {
+            logger.error("Failed to close transaction", ex);
+        }
+    }
+
+    /**
+     * Closes a transaction with commit or rollback. Strict transaction management settings are observed when
+     * configured as such in {@link Settings#strictTransactionManagement} and when aliases are present on the
+     * request in the current {@link Context}. If the supplied {@link Context} is {@code null} then "strict" is
+     * bypassed so this form must be called with care. Bypassing is often useful to ensure that all transactions
+     * are cleaned up when multiple graphs are referenced. Prefer calling {@link #closeTransaction(Transaction.Status)}
+     * in this case instead. This method is idempotent.
+     */
+    protected void closeTransaction(final Context gremlinContext, final Transaction.Status status) {
+        if (status != Transaction.Status.COMMIT && status != Transaction.Status.ROLLBACK)
+            throw new IllegalStateException(String.format("Transaction.Status not supported: %s", status));
+
+        final boolean commit = status == Transaction.Status.COMMIT;
+        final boolean strict = gremlinContext != null && gremlinContext.getSettings().strictTransactionManagement;
+
+        if (strict) {
+            if (commit)
+                graphManager.commit(new HashSet<>(aliasesUsedByRexster));
+            else
+                graphManager.rollback(new HashSet<>(aliasesUsedByRexster));
+        } else {
+            if (commit)
+                graphManager.commitAll();
+            else
+                graphManager.rollbackAll();
+        }
+    }
+
+    private void processAuditLog(final Settings settings, final ChannelHandlerContext ctx, final Object gremlinToExecute) {
+        if (settings.enableAuditLog) {
+            AuthenticatedUser user = ctx.channel().attr(StateKey.AUTHENTICATED_USER).get();
+            if (null == user) {    // This is expected when using the AllowAllAuthenticator
+                user = AuthenticatedUser.ANONYMOUS_USER;
+            }
+            String address = ctx.channel().remoteAddress().toString();
+            if (address.startsWith("/") && address.length() > 1) address = address.substring(1);
+            auditLogger.info("User {} with address {} requested: {}", user.getName(), address, gremlinToExecute);
+        }
+
+        if (settings.authentication.enableAuditLog) {
+            String address = ctx.channel().remoteAddress().toString();
+            if (address.startsWith("/") && address.length() > 1) address = address.substring(1);
+            auditLogger.info("User with address {} requested: {}", address, gremlinToExecute);
+        }
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthenticationHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthenticationHandler.java
index a282874..a050ab0 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthenticationHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthenticationHandler.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.server.Settings;
 import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
 import org.apache.tinkerpop.gremlin.server.auth.AuthenticationException;
 import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
+import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,8 +55,16 @@ public class HttpBasicAuthenticationHandler extends AbstractAuthenticationHandle
 
     private final Base64.Decoder decoder = Base64.getUrlDecoder();
 
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #HttpBasicAuthenticationHandler(Authenticator, Authorizer, Settings)}.
+     */
+    @Deprecated
     public HttpBasicAuthenticationHandler(final Authenticator authenticator, final Settings settings) {
-        super(authenticator);
+        this(authenticator, null, settings);
+    }
+
+    public HttpBasicAuthenticationHandler(final Authenticator authenticator, final Authorizer authorizer, final Settings settings) {
+        super(authenticator, authorizer);
         this.settings = settings;
     }
 
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java
new file mode 100644
index 0000000..539c16b
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
+import org.apache.tinkerpop.gremlin.groovy.jsr223.GroovyCompilerGremlinPlugin;
+import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptEngine;
+import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptEngineManager;
+import org.apache.tinkerpop.gremlin.server.Context;
+import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.script.Bindings;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor.CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED;
+
+/**
+ * A {@link Rexster} implementation that queues tasks given to it and executes them in a serial fashion within the
+ * same thread which thus allows multiple tasks to be executed in the same transaction.
+ */
+public class MultiRexster extends AbstractRexster {
+    private static final Logger logger = LoggerFactory.getLogger(MultiRexster.class);
+    protected final BlockingQueue<Context> queue = new LinkedBlockingQueue<>();
+    private ScheduledFuture<?> requestCancelFuture;
+    private Bindings bindings;
+    private final AtomicBoolean ending = new AtomicBoolean(false);
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final GremlinScriptEngineManager scriptEngineManager;
+
+    MultiRexster(final Context gremlinContext, final String sessionId,
+                 final ConcurrentMap<String, Rexster> sessions) {
+        super(gremlinContext, sessionId, false, sessions);
+
+        // using a global function cache is cheaper than creating a new on per session especially if you have to
+        // create a lot of sessions. it will generate a ton of throw-away objects. mostly keeping the option open
+        // to not use it to preserve the ability to use the old functionality if wanted or if there is some specific
+        // use case with sessions that needs it. if we wanted this could eventually become a per-request option
+        // so that the client could control it as necessary and get scriptengine isolation if they need it.
+        if (gremlinContext.getSettings().useCommonEngineForSessions)
+            scriptEngineManager = gremlinContext.getGremlinExecutor().getScriptEngineManager();
+        else
+            scriptEngineManager = initializeGremlinExecutor(gremlinContext).getScriptEngineManager();
+
+        scheduledExecutorService = gremlinContext.getScheduledExecutorService();
+        addTask(gremlinContext);
+    }
+
+    @Override
+    public GremlinScriptEngine getScriptEngine(final Context gremlinContext, final String language) {
+        return scriptEngineManager.getEngineByName(language);
+    }
+
+    @Override
+    public boolean acceptingTasks() {
+        return !ending.get();
+    }
+
+    @Override
+    public void addTask(final Context gremlinContext) {
+        // todo: explicitly reject request???
+        if (acceptingTasks())
+            queue.offer(gremlinContext);
+    }
+
+    @Override
+    public void run() {
+        // there must be one item in the queue at least since addTask() gets called before the worker
+        // is ever started
+        Context gremlinContext = queue.poll();
+        if (null == gremlinContext)
+            throw new IllegalStateException(String.format("Worker has no initial context for session: %s", getSessionId()));
+
+        try {
+            startTransaction(gremlinContext);
+            try {
+                while (true) {
+                    // schedule timeout for the current request from the queue
+                    final long seto = gremlinContext.getRequestTimeout();
+                    requestCancelFuture = scheduledExecutorService.schedule(
+                            () -> this.triggerTimeout(seto, false),
+                            seto, TimeUnit.MILLISECONDS);
+
+                    process(gremlinContext);
+
+                    // work is done within the timeout period so cancel it
+                    cancelRequestTimeout();
+
+                    gremlinContext = queue.take();
+                }
+            } catch (Exception ex) {
+                // stop accepting requests on this worker since it is heading to close()
+                ending.set(true);
+
+                // the current context gets its exception handled...
+                handleException(gremlinContext, ex);
+            }
+        } catch (RexsterException rexex) {
+            // remaining work items in the queue are ignored since this worker is closing. must send
+            // back some sort of response to satisfy the client. writeAndFlush code is different than
+            // the ResponseMessage as we don't want the message to be "final" for the Context. that
+            // status must be reserved for the message that caused the error
+            for (Context gctx : queue) {
+                gctx.writeAndFlush(ResponseStatusCode.PARTIAL_CONTENT, ResponseMessage.build(gctx.getRequestMessage())
+                        .code(ResponseStatusCode.SERVER_ERROR)
+                        .statusMessage(String.format(
+                                "An earlier request [%s] failed prior to this one having a chance to execute",
+                                gremlinContext.getRequestMessage().getRequestId())).create());
+            }
+
+            // exception should trigger a rollback in the session. a more focused rollback may have occurred
+            // during process() and the related result iteration IF transaction management was enabled on
+            // the request
+            closeTransactionSafely(Transaction.Status.ROLLBACK);
+
+            logger.warn(rexex.getMessage(), rexex);
+            gremlinContext.writeAndFlush(rexex.getResponseMessage());
+        } finally {
+            close();
+        }
+    }
+
+    @Override
+    public synchronized void close() {
+        ending.set(true);
+        cancelRequestTimeout();
+        super.close();
+        logger.info("Session {} closed", getSessionId());
+    }
+
+    private void cancelRequestTimeout() {
+        if (requestCancelFuture != null && !requestCancelFuture.isDone())
+            requestCancelFuture.cancel(true);
+    }
+
+    @Override
+    protected Bindings getWorkerBindings() throws RexsterException {
+        if (null == bindings)
+            bindings = super.getWorkerBindings();
+        return this.bindings;
+    }
+
+    protected GremlinExecutor initializeGremlinExecutor(final Context gremlinContext) {
+        final Settings settings = gremlinContext.getSettings();
+        final ExecutorService executor = gremlinContext.getGremlinExecutor().getExecutorService();
+        final boolean useGlobalFunctionCache = settings.useGlobalFunctionCacheForSessions;
+
+        // these initial settings don't matter so much as we don't really execute things through the
+        // GremlinExecutor directly. Just doing all this setup to make GremlinExecutor do the work of
+        // rigging up the GremlinScriptEngineManager.
+        final GremlinExecutor.Builder gremlinExecutorBuilder = GremlinExecutor.build()
+                .evaluationTimeout(settings.getEvaluationTimeout())
+                .executorService(executor)
+                .globalBindings(graphManager.getAsBindings())
+                .scheduledExecutorService(scheduledExecutorService);
+
+        settings.scriptEngines.forEach((k, v) -> {
+            // use plugins if they are present
+            if (!v.plugins.isEmpty()) {
+                // make sure that server related classes are available at init. the LifeCycleHook stuff will be
+                // added explicitly via configuration using GremlinServerGremlinModule in the yaml. need to override
+                // scriptengine settings with SessionOpProcessor specific ones as the processing for sessions is
+                // different and a global setting may not make sense for a session
+                if (v.plugins.containsKey(GroovyCompilerGremlinPlugin.class.getName())) {
+                    v.plugins.get(GroovyCompilerGremlinPlugin.class.getName()).put(CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED, useGlobalFunctionCache);
+                } else {
+                    final Map<String,Object> pluginConf = new HashMap<>();
+                    pluginConf.put(CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED, useGlobalFunctionCache);
+                    v.plugins.put(GroovyCompilerGremlinPlugin.class.getName(), pluginConf);
+                }
+
+                gremlinExecutorBuilder.addPlugins(k, v.plugins);
+            }
+        });
+
+        return gremlinExecutorBuilder.create();
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java
new file mode 100644
index 0000000..70869dd
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import io.netty.channel.Channel;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.server.Context;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * The {@code Rexster} interface is essentially a form of "worker", named for Gremlin's trusty canine robot friend
+ * who represents the "server" aspect of TinkerPop. In the most generic sense, {@code Rexster} implementations take
+ * tasks, a Gremlin Server {@link Context} and process them, which typically means "execute some Gremlin".
+ * Implementations have a fair amount of flexibility in terms of how they go about doing this, but in most cases,
+ * the {@link SingleRexster} and the {@link MultiRexster} should handle most cases quite well and are extensible for
+ * providers.
+ */
+public interface Rexster extends Runnable {
+    /**
+     * Adds a task for Rexster to complete.
+     */
+    void addTask(final Context gremlinContext);
+
+    /**
+     * Sets a reference to the job that will cancel this Rexster if it exceeds its timeout period.
+     */
+    void setSessionCancelFuture(final ScheduledFuture<?> f);
+
+    /**
+     * Sets a reference to the job itself that is running this Rexster.
+     */
+    void setSessionFuture(final Future<?> f);
+
+    /**
+     * Provides a general way to tell Rexster that it has exceeded some timeout condition.
+     */
+    void triggerTimeout(final long timeout, boolean causedBySession);
+
+    /**
+     * Determines if the supplied {@code Channel} object is the same as the one bound to the {@code Session}.
+     */
+    boolean isBoundTo(final Channel channel);
+
+    /**
+     * Determins if this Rexster can accept additional tasks or not.
+     */
+    boolean acceptingTasks();
+
+    public class RexsterException extends Exception {
+        private final ResponseMessage responseMessage;
+
+        public RexsterException(final String message, final ResponseMessage responseMessage) {
+            super(message);
+            this.responseMessage = responseMessage;
+        }
+
+        public RexsterException(final String message, final Throwable cause, final ResponseMessage responseMessage) {
+            super(message, cause);
+            this.responseMessage = responseMessage;
+        }
+
+        public ResponseMessage getResponseMessage() {
+            return this.responseMessage;
+        }
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAndHttpBasicAuthenticationHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAndHttpBasicAuthenticationHandler.java
index d58553d..31dabd0 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAndHttpBasicAuthenticationHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAndHttpBasicAuthenticationHandler.java
@@ -21,14 +21,14 @@ package org.apache.tinkerpop.gremlin.server.handler;
 
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.http.HttpMessage;
 import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
 import org.apache.tinkerpop.gremlin.server.Settings;
-import org.apache.tinkerpop.gremlin.server.handler.HttpBasicAuthenticationHandler;
-import org.apache.tinkerpop.gremlin.server.handler.SaslAuthenticationHandler;
-import org.apache.tinkerpop.gremlin.server.handler.WebSocketHandlerUtil;
+import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
 
+import static org.apache.tinkerpop.gremlin.server.AbstractChannelizer.PIPELINE_AUTHORIZER;
 import static org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer.PIPELINE_AUTHENTICATOR;
 
 /**
@@ -39,18 +39,33 @@ public class SaslAndHttpBasicAuthenticationHandler extends SaslAuthenticationHan
 
     private final String HTTP_AUTH = "http-authentication";
 
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #SaslAndHttpBasicAuthenticationHandler(Authenticator, Authorizer, Settings)}.
+     */
+    @Deprecated
     public SaslAndHttpBasicAuthenticationHandler(final Authenticator authenticator, final Settings settings) {
-        super(authenticator, settings);
+        this(authenticator, null, settings);
+    }
+
+    public SaslAndHttpBasicAuthenticationHandler(final Authenticator authenticator, final Authorizer authorizer, final Settings settings) {
+        super(authenticator, authorizer, settings);
     }
 
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object obj) throws Exception {
         if (obj instanceof HttpMessage && !WebSocketHandlerUtil.isWebSocket((HttpMessage)obj)) {
-            ChannelPipeline pipeline = ctx.pipeline();
+            final ChannelPipeline pipeline = ctx.pipeline();
             if (null != pipeline.get(HTTP_AUTH)) {
                 pipeline.remove(HTTP_AUTH);
             }
             pipeline.addAfter(PIPELINE_AUTHENTICATOR, HTTP_AUTH, new HttpBasicAuthenticationHandler(authenticator, this.settings));
+
+            if (authorizer != null) {
+                final ChannelInboundHandlerAdapter authorizationHandler = new HttpBasicAuthorizationHandler(authorizer);
+                pipeline.remove(PIPELINE_AUTHORIZER);
+                pipeline.addAfter(HTTP_AUTH, PIPELINE_AUTHORIZER, authorizationHandler);
+            }
+
             ctx.fireChannelRead(obj);
         } else {
             super.channelRead(ctx, obj);
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
index e8216a6..55da853 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
@@ -40,6 +40,7 @@ import org.apache.tinkerpop.gremlin.server.Settings;
 import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
 import org.apache.tinkerpop.gremlin.server.auth.AuthenticationException;
 import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
+import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
 import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,8 +60,16 @@ public class SaslAuthenticationHandler extends AbstractAuthenticationHandler {
 
     protected final Settings settings;
 
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #SaslAuthenticationHandler(Authenticator, Authorizer, Settings)}.
+     */
+    @Deprecated
     public SaslAuthenticationHandler(final Authenticator authenticator, final Settings settings) {
-        super(authenticator);
+        this(authenticator, null, settings);
+    }
+
+    public SaslAuthenticationHandler(final Authenticator authenticator, final Authorizer authorizer, final Settings settings) {
+        super(authenticator, authorizer);
         this.settings = settings;
     }
 
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SingleRexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SingleRexster.java
new file mode 100644
index 0000000..05d5dff
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SingleRexster.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import org.apache.tinkerpop.gremlin.server.Context;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+
+/**
+ * A simple {@link Rexster} implementation that accepts one request, processes it and exits.
+ */
+public class SingleRexster extends AbstractRexster {
+    private static final Logger logger = LoggerFactory.getLogger(SingleRexster.class);
+    protected final Context gremlinContext;
+
+    SingleRexster(final Context gremlinContext, final String sessionId,
+                  final ConcurrentMap<String, Rexster> sessions) {
+        super(gremlinContext, sessionId,true, sessions);
+        this.gremlinContext = gremlinContext;
+    }
+
+    /**
+     * The {@code SingleWorker} can only process one request so the initial construction of it already has the
+     * request in it and no more can be added, therefore this method always return {@code false}.
+     */
+    @Override
+    public boolean acceptingTasks() {
+        return false;
+    }
+
+    @Override
+    public void addTask(final Context gremlinContext) {
+        throw new UnsupportedOperationException("SingleWorker doesn't accept tasks beyond the one provided to the constructor");
+    }
+
+    @Override
+    public void run() {
+        try {
+            startTransaction(gremlinContext);
+            process(gremlinContext);
+        } catch (RexsterException we) {
+            logger.warn(we.getMessage(), we);
+
+            // should have already rolledback - this is a safety valve
+            closeTransactionSafely(gremlinContext, Transaction.Status.ROLLBACK);
+
+            gremlinContext.writeAndFlush(we.getResponseMessage());
+        } finally {
+            close();
+        }
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java
new file mode 100644
index 0000000..7483062
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
+import org.apache.tinkerpop.gremlin.process.traversal.Operator;
+import org.apache.tinkerpop.gremlin.process.traversal.Order;
+import org.apache.tinkerpop.gremlin.process.traversal.Pop;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.server.Channelizer;
+import org.apache.tinkerpop.gremlin.server.Context;
+import org.apache.tinkerpop.gremlin.server.GraphManager;
+import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.Rexster.RexsterException;
+import org.apache.tinkerpop.gremlin.structure.Column;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Handler for websockets to be used with the {@link UnifiedChannelizer}.
+ */
+@ChannelHandler.Sharable
+public class UnifiedHandler extends SimpleChannelInboundHandler<RequestMessage> {
+    private static final Logger logger = LoggerFactory.getLogger(UnifiedHandler.class);
+
+    protected final Settings settings;
+    protected final GraphManager graphManager;
+    protected final GremlinExecutor gremlinExecutor;
+    protected final ScheduledExecutorService scheduledExecutorService;
+    protected final ExecutorService executorService;
+    protected final Channelizer channelizer;
+
+    protected final ConcurrentMap<String, Rexster> sessions = new ConcurrentHashMap<>();
+
+    /**
+     * This may or may not be the full set of invalid binding keys.  It is dependent on the static imports made to
+     * Gremlin Server.  This should get rid of the worst offenders though and provide a good message back to the
+     * calling client.
+     * <p/>
+     * Use of {@code toUpperCase()} on the accessor values of {@link T} solves an issue where the {@code ScriptEngine}
+     * ignores private scope on {@link T} and imports static fields.
+     */
+    protected static final Set<String> INVALID_BINDINGS_KEYS = new HashSet<>();
+
+    static {
+        INVALID_BINDINGS_KEYS.addAll(Arrays.asList(
+                T.id.name(), T.key.name(),
+                T.label.name(), T.value.name(),
+                T.id.getAccessor(), T.key.getAccessor(),
+                T.label.getAccessor(), T.value.getAccessor(),
+                T.id.getAccessor().toUpperCase(), T.key.getAccessor().toUpperCase(),
+                T.label.getAccessor().toUpperCase(), T.value.getAccessor().toUpperCase()));
+
+        for (Column enumItem : Column.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+
+        for (Order enumItem : Order.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+
+        for (Operator enumItem : Operator.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+
+        for (Scope enumItem : Scope.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+
+        for (Pop enumItem : Pop.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+    }
+
+    public UnifiedHandler(final Settings settings, final GraphManager graphManager,
+                          final GremlinExecutor gremlinExecutor,
+                          final ScheduledExecutorService scheduledExecutorService,
+                          final Channelizer channelizer) {
+        this.settings = settings;
+        this.graphManager = graphManager;
+        this.gremlinExecutor = gremlinExecutor;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.executorService = gremlinExecutor.getExecutorService();
+        this.channelizer = channelizer;
+    }
+
+    @Override
+    protected void channelRead0(final ChannelHandlerContext ctx, final RequestMessage msg) throws Exception {
+        try {
+            try {
+                validateRequest(msg, graphManager);
+            } catch (RexsterException we) {
+                ctx.writeAndFlush(we.getResponseMessage());
+                return;
+            }
+
+            final Optional<String> optSession = msg.optionalArgs(Tokens.ARGS_SESSION);
+            final String sessionId = optSession.orElse(UUID.randomUUID().toString());
+
+            // still using GremlinExecutor here in the Context so that this object doesn't need to immediately
+            // change, but also because GremlinExecutor/ScriptEngine config is all rigged up into the server nicely
+            // right now. when the UnifiedChannelizer is "ready" we can factor out the GremlinExecutor
+            final Context gremlinContext = new Context(msg, ctx, settings, graphManager,
+                    gremlinExecutor, scheduledExecutorService);
+
+            if (sessions.containsKey(sessionId)) {
+                final Rexster rexster = sessions.get(sessionId);
+
+                // check if the session is still accepting requests - if not block further requests
+                if (!rexster.acceptingTasks()) {
+                    final String sessionClosedMessage = String.format(
+                            "Session %s is no longer accepting requests as it has been closed", sessionId);
+                    final ResponseMessage response = ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
+                            .statusMessage(sessionClosedMessage).create();
+                    ctx.writeAndFlush(response);
+                    return;
+                }
+
+                // check if the session is bound to this channel, thus one client per session
+                if (!rexster.isBoundTo(gremlinContext.getChannelHandlerContext().channel())) {
+                    final String sessionClosedMessage = String.format("Session %s is not bound to the connecting client", sessionId);
+                    final ResponseMessage response = ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
+                            .statusMessage(sessionClosedMessage).create();
+                    ctx.writeAndFlush(response);
+                    return;
+                }
+
+                rexster.addTask(gremlinContext);
+            } else {
+                final Rexster rexster = optSession.isPresent() ?
+                        createMulti(gremlinContext, sessionId) : createSingle(gremlinContext, sessionId);
+                final Future<?> sessionFuture = executorService.submit(rexster);
+                rexster.setSessionFuture(sessionFuture);
+                sessions.put(sessionId, rexster);
+
+                // determine the max session life. for multi that's going to be "session life" and for single that
+                // will be the span of the request timeout
+                final long seto = gremlinContext.getRequestTimeout();
+                final long sessionLife = optSession.isPresent() ? settings.sessionLifetimeTimeout : seto;
+
+                // if timeout is enabled when greater than zero
+                if (seto > 0) {
+                    final ScheduledFuture<?> sessionCancelFuture =
+                            scheduledExecutorService.schedule(
+                                    () -> rexster.triggerTimeout(sessionLife, optSession.isPresent()),
+                                    sessionLife, TimeUnit.MILLISECONDS);
+                    rexster.setSessionCancelFuture(sessionCancelFuture);
+                }
+            }
+        } finally {
+            ReferenceCountUtil.release(msg);
+        }
+    }
+
+    protected void validateRequest(final RequestMessage message, final GraphManager graphManager) throws RexsterException {
+        if (!message.optionalArgs(Tokens.ARGS_GREMLIN).isPresent()) {
+            final String msg = String.format("A message with an [%s] op code requires a [%s] argument.", message.getOp(), Tokens.ARGS_GREMLIN);
+            throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+        }
+
+        if (message.optionalArgs(Tokens.ARGS_BINDINGS).isPresent()) {
+            final Map bindings = (Map) message.getArgs().get(Tokens.ARGS_BINDINGS);
+            if (IteratorUtils.anyMatch(bindings.keySet().iterator(), k -> null == k || !(k instanceof String))) {
+                final String msg = String.format("The [%s] message is using one or more invalid binding keys - they must be of type String and cannot be null", Tokens.OPS_EVAL);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+
+            final Set<String> badBindings = IteratorUtils.set(IteratorUtils.<String>filter(bindings.keySet().iterator(), INVALID_BINDINGS_KEYS::contains));
+            if (!badBindings.isEmpty()) {
+                final String msg = String.format("The [%s] message supplies one or more invalid parameters key of [%s] - these are reserved names.", Tokens.OPS_EVAL, badBindings);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+
+            // ignore control bindings that get passed in with the "#jsr223" prefix - those aren't used in compilation
+            if (IteratorUtils.count(IteratorUtils.filter(bindings.keySet().iterator(), k -> !k.toString().startsWith("#jsr223"))) > settings.maxParameters) {
+                final String msg = String.format("The [%s] message contains %s bindings which is more than is allowed by the server %s configuration",
+                        Tokens.OPS_EVAL, bindings.size(), settings.maxParameters);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+        }
+
+        // bytecode must have an alias defined
+        if (message.getOp().equals(Tokens.OPS_BYTECODE)) {
+            final Optional<Map<String, String>> aliases = message.optionalArgs(Tokens.ARGS_ALIASES);
+            if (!aliases.isPresent()) {
+                final String msg = String.format("A message with [%s] op code requires a [%s] argument.", Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+
+            if (aliases.get().size() != 1 || !aliases.get().containsKey(Tokens.VAL_TRAVERSAL_SOURCE_ALIAS)) {
+                final String msg = String.format("A message with [%s] op code requires the [%s] argument to be a Map containing one alias assignment named '%s'.",
+                        Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES, Tokens.VAL_TRAVERSAL_SOURCE_ALIAS);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+
+            final String traversalSourceBindingForAlias = aliases.get().values().iterator().next();
+            if (!graphManager.getTraversalSourceNames().contains(traversalSourceBindingForAlias)) {
+                final String msg = String.format("The traversal source [%s] for alias [%s] is not configured on the server.", traversalSourceBindingForAlias, Tokens.VAL_TRAVERSAL_SOURCE_ALIAS);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+        }
+    }
+
+    @Override
+    public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
+        // only need to handle this event if the idle monitor is on
+        if (!channelizer.supportsIdleMonitor()) return;
+
+        if (evt instanceof IdleStateEvent) {
+            final IdleStateEvent e = (IdleStateEvent) evt;
+
+            // if no requests (reader) then close, if no writes from server to client then ping. clients should
+            // periodically ping the server, but coming from this direction allows the server to kill channels that
+            // have dead clients on the other end
+            if (e.state() == IdleState.READER_IDLE) {
+                logger.info("Closing channel - client is disconnected after idle period of " + settings.idleConnectionTimeout + " " + ctx.channel().id().asShortText());
+                ctx.close();
+            } else if (e.state() == IdleState.WRITER_IDLE && settings.keepAliveInterval > 0) {
+                logger.info("Checking channel - sending ping to client after idle period of " + settings.keepAliveInterval + " " + ctx.channel().id().asShortText());
+                ctx.writeAndFlush(channelizer.createIdleDetectionMessage());
+            }
+        }
+    }
+
+    protected Rexster createSingle(final Context gremlinContext, final String sessionId) {
+        return new SingleRexster(gremlinContext, sessionId, sessions);
+    }
+
+    protected Rexster createMulti(final Context gremlinContext, final String sessionId) {
+        return new MultiRexster(gremlinContext, sessionId, sessions);
+    }
+
+    public boolean isActiveSession(final String sessionId) {
+        return sessions.containsKey(sessionId);
+    }
+
+    public int getActiveSessionCount() {
+        return sessions.size();
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java
index e4f79b4..a93e1c4 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java
@@ -28,9 +28,9 @@ import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
 import org.apache.tinkerpop.gremlin.server.channel.WsAndHttpChannelizer;
 import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
 
+import static org.apache.tinkerpop.gremlin.server.AbstractChannelizer.PIPELINE_HTTP_AGGREGATOR;
 import static org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer.PIPELINE_AUTHENTICATOR;
 import static org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer.PIPELINE_REQUEST_HANDLER;
-import static org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer.PIPELINE_HTTP_RESPONSE_ENCODER;
 
 /**
  * A ChannelInboundHandlerAdapter for use with {@link WsAndHttpChannelizer} that toggles between WebSockets
@@ -66,11 +66,11 @@ public class WsAndHttpChannelizerHandler extends ChannelInboundHandlerAdapter {
                 pipeline.remove(PIPELINE_REQUEST_HANDLER);
                 final ChannelHandler authenticator = pipeline.get(PIPELINE_AUTHENTICATOR);
                 pipeline.remove(PIPELINE_AUTHENTICATOR);
-                pipeline.addAfter(PIPELINE_HTTP_RESPONSE_ENCODER, PIPELINE_AUTHENTICATOR, authenticator);
+                pipeline.addAfter(PIPELINE_HTTP_AGGREGATOR, PIPELINE_AUTHENTICATOR, authenticator);
                 pipeline.addAfter(PIPELINE_AUTHENTICATOR, PIPELINE_REQUEST_HANDLER, this.httpGremlinEndpointHandler);
             } else {
                 pipeline.remove(PIPELINE_REQUEST_HANDLER);
-                pipeline.addAfter(PIPELINE_HTTP_RESPONSE_ENCODER, PIPELINE_REQUEST_HANDLER, this.httpGremlinEndpointHandler);
+                pipeline.addAfter(PIPELINE_HTTP_AGGREGATOR, PIPELINE_REQUEST_HANDLER, this.httpGremlinEndpointHandler);
             }
         }
         ctx.fireChannelRead(obj);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
index 55f89e6..deb1734 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
@@ -18,7 +18,14 @@
  */
 package org.apache.tinkerpop.gremlin.server;
 
+import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizerIntegrateTest;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizerIntegrateTest;
+import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
+import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizerIntegrateTest;
 import org.apache.tinkerpop.gremlin.server.op.OpLoader;
+import org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor;
+import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -96,16 +103,28 @@ public abstract class AbstractGremlinServerIntegrationTest {
         startServer(settings);
     }
 
+    private boolean shouldTestUnified() {
+        // ignore all tests in the UnifiedChannelizerIntegrateTest package as they are already rigged to test
+        // over the various channelizer implementations
+        return Boolean.parseBoolean(System.getProperty("testUnified", "false")) &&
+                !this.getClass().getPackage().equals(UnifiedChannelizerIntegrateTest.class.getPackage());
+    }
+
     public void startServer(final Settings settings) throws Exception {
         if (null == settings) {
             startServer();
         } else {
-            final Settings overridenSettings = overrideSettings(settings);
-            ServerTestHelper.rewritePathsInGremlinServerSettings(overridenSettings);
+            final Settings oSettings = overrideSettings(settings);
+
+            if (shouldTestUnified()) {
+                oSettings.channelizer = UnifiedChannelizer.class.getName();
+            }
+
+            ServerTestHelper.rewritePathsInGremlinServerSettings(oSettings);
             if (GREMLIN_SERVER_EPOLL) {
-                overridenSettings.useEpollEventLoop = true;
+                oSettings.useEpollEventLoop = true;
             }
-            this.server = new GremlinServer(overridenSettings);
+            this.server = new GremlinServer(oSettings);
             server.start().join();
         }
     }
@@ -119,6 +138,10 @@ public abstract class AbstractGremlinServerIntegrationTest {
             overriddenSettings.useEpollEventLoop = true;
         }
 
+        if (shouldTestUnified()) {
+            overriddenSettings.channelizer = UnifiedChannelizer.class.getName();
+        }
+
         this.server = new GremlinServer(overriddenSettings);
 
         server.start().join();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
index f8956e2..e685932 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
@@ -47,7 +47,8 @@ public class ContextTest {
 
     private final ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class);
     private final RequestMessage request = RequestMessage.build("test").create();
-    private final Context context = new Context(request, ctx, null, null, null, null);
+    private final Settings settings = new Settings();
+    private final Context context = new Context(request, ctx, settings, null, null, null);
     private final Log4jRecordingAppender recordingAppender = new Log4jRecordingAppender();
 
     private Level originalLogLevel;
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 3c1d308..c11cad8 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -42,6 +42,7 @@ import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin;
 import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
 import org.apache.tinkerpop.gremlin.server.handler.OpExecutorHandler;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
@@ -99,6 +100,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.core.StringStartsWith.startsWith;
+import static org.junit.Assume.assumeThat;
 import static org.mockito.Mockito.verify;
 
 /**
@@ -1645,16 +1647,60 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
                 final CompletableFuture<List<Result>> futureThird = third.get().all();
                 final CompletableFuture<List<Result>> futureFourth = fourth.get().all();
 
-                assertFutureTimeout(futureFirst);
-                assertFutureTimeout(futureSecond);
-                assertFutureTimeout(futureThird);
-                assertFutureTimeout(futureFourth);
+                // there is slightly different assertion logic with UnifiedChannelizer given differences in session
+                // behavior where UnfiedChannelizer sessions won't continue processing in the face of a timeout and
+                // a new session will need to be created
+                if (server.getServerGremlinExecutor().getSettings().channelizer.equals(UnifiedChannelizer.class.getName())) {
+                    // first timesout and the rest get SERVER_ERROR
+                    try {
+                        futureFirst.get();
+                        fail("Should have timed out");
+                    } catch (Exception ex) {
+                        final Throwable root = ExceptionUtils.getRootCause(ex);
+                        assertThat(root, instanceOf(ResponseException.class));
+                        assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) root).getResponseStatusCode());
+                        assertThat(root.getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("250 ms")));
+                    }
+
+                    assertFutureTimeoutUnderUnified(futureSecond);
+                    assertFutureTimeoutUnderUnified(futureThird);
+                    assertFutureTimeoutUnderUnified(futureFourth);
+                } else {
+                    assertFutureTimeout(futureFirst);
+                    assertFutureTimeout(futureSecond);
+                    assertFutureTimeout(futureThird);
+                    assertFutureTimeout(futureFourth);
+                }
             }
         } finally {
             cluster.close();
         }
     }
 
+    private void assertFutureTimeoutUnderUnified(final CompletableFuture<List<Result>> f) {
+        try {
+            f.get();
+            fail("Should have timed out");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(ResponseException.class));
+            assertEquals(ResponseStatusCode.SERVER_ERROR, ((ResponseException) root).getResponseStatusCode());
+            assertThat(root.getMessage(), allOf(startsWith("An earlier request"), endsWith("failed prior to this one having a chance to execute")));
+        }
+    }
+
+    private void assertFutureTimeout(final CompletableFuture<List<Result>> f) {
+        try {
+            f.get();
+            fail("Should have timed out");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(ResponseException.class));
+            assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) root).getResponseStatusCode());
+            assertThat(root.getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("250 ms")));
+        }
+    }
+
     @Test
     public void shouldCloseAllClientsOnCloseOfCluster() throws Exception {
         final Cluster cluster = TestClientFactory.open();
@@ -1780,20 +1826,6 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
 
     }
 
-    private void assertFutureTimeout(final CompletableFuture<List<Result>> futureFirst) {
-        try
-        {
-            futureFirst.get();
-            fail("Should have timed out");
-        }
-        catch (Exception ex)
-        {
-            final Throwable root = ExceptionUtils.getRootCause(ex);
-            assertThat(root, instanceOf(ResponseException.class));
-            assertThat(root.getMessage(), startsWith("Evaluation exceeded the configured 'evaluationTimeout' threshold of 250 ms"));
-        }
-    }
-
     @Test
     public void shouldClusterReadFileFromResources() throws Exception {
         final Cluster cluster = Cluster.open(TestClientFactory.RESOURCE_PATH);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogDeprecatedIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogDeprecatedIntegrateTest.java
index a9d7228..e81f889 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogDeprecatedIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogDeprecatedIntegrateTest.java
@@ -31,10 +31,10 @@ import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
 import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.server.auth.AllowAllAuthenticator;
-import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
 import org.apache.tinkerpop.gremlin.server.auth.Krb5Authenticator;
 import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
 import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
 import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
 import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
@@ -81,7 +81,7 @@ public class GremlinServerAuditLogDeprecatedIntegrateTest extends AbstractGremli
         rootLogger.addAppender(recordingAppender);
 
         try {
-            final String moduleBaseDir = System.getProperty("basedir");
+            final String moduleBaseDir = System.getProperty("basedir", ".");
             final String authConfigName = moduleBaseDir + "/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-console-jaas.conf";
             System.setProperty("java.security.auth.login.config", authConfigName);
             kdcServer = new KdcFixture(moduleBaseDir);
@@ -139,6 +139,7 @@ public class GremlinServerAuditLogDeprecatedIntegrateTest extends AbstractGremli
                 settings.host = "localhost";
                 settings.channelizer = HttpChannelizer.class.getName();
                 authSettings.authenticator = SimpleAuthenticator.class.getName();
+                authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
                 authConfig.put(SimpleAuthenticator.CONFIG_CREDENTIALS_DB, "conf/tinkergraph-credentials.properties");
                 break;
         }
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java
index 69bb974..28ad1e1 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java
@@ -35,6 +35,7 @@ import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
 import org.apache.tinkerpop.gremlin.server.auth.Krb5Authenticator;
 import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
 import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
 import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
 import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
@@ -81,7 +82,7 @@ public class GremlinServerAuditLogIntegrateTest extends AbstractGremlinServerInt
         rootLogger.addAppender(recordingAppender);
 
         try {
-            final String moduleBaseDir = System.getProperty("basedir");
+            final String moduleBaseDir = System.getProperty("basedir", ".");
             final String authConfigName = moduleBaseDir + "/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-console-jaas.conf";
             System.setProperty("java.security.auth.login.config", authConfigName);
             kdcServer = new KdcFixture(moduleBaseDir);
@@ -139,6 +140,7 @@ public class GremlinServerAuditLogIntegrateTest extends AbstractGremlinServerInt
                 settings.host = "localhost";
                 settings.channelizer = HttpChannelizer.class.getName();
                 authSettings.authenticator = SimpleAuthenticator.class.getName();
+                authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
                 authConfig.put(SimpleAuthenticator.CONFIG_CREDENTIALS_DB, "conf/tinkergraph-credentials.properties");
                 break;
         }
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java
index 4ec467e..7d10a3a 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java
@@ -65,7 +65,7 @@ public class GremlinServerAuthKrb5IntegrateTest extends AbstractGremlinServerInt
         handlerLogger.setLevel(Level.OFF);
 
         try {
-            final String projectBaseDir = System.getProperty("basedir");
+            final String projectBaseDir = System.getProperty("basedir", ".");
             final String authConfigName = projectBaseDir + "/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-console-jaas.conf";
             System.setProperty("java.security.auth.login.config", authConfigName);
             kdcServer = new KdcFixture(projectBaseDir);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthzIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthzIntegrateTest.java
index 5614123..00eacda 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthzIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthzIntegrateTest.java
@@ -35,6 +35,7 @@ import org.apache.tinkerpop.gremlin.server.auth.AllowAllAuthenticator;
 import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
 import org.apache.tinkerpop.gremlin.server.authz.AllowListAuthorizer;
 import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
 import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
 import org.apache.tinkerpop.gremlin.util.function.Lambda;
@@ -116,10 +117,12 @@ public class GremlinServerAuthzIntegrateTest extends AbstractGremlinServerIntegr
             case "shouldFailAuthorizeWithHttpTransport":
             case "shouldKeepAuthorizingWithHttpTransport":
                 settings.channelizer = HttpChannelizer.class.getName();
+                authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
                 break;
             case "shouldAuthorizeWithAllowAllAuthenticatorAndHttpTransport":
                 settings.channelizer = HttpChannelizer.class.getName();
                 authSettings.authenticator = AllowAllAuthenticator.class.getName();
+                authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
                 authSettings.config = null;
                 final String fileHttp = Objects.requireNonNull(getClass().getClassLoader().getResource(yamlHttpName)).getFile();
                 authzSettings.config.put(AllowListAuthorizer.KEY_AUTHORIZATION_ALLOWLIST, fileHttp);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
index fadc5e2..05cd939 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
@@ -36,6 +36,7 @@ import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
 import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
 import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
@@ -133,6 +134,7 @@ public class GremlinServerHttpIntegrateTest extends AbstractGremlinServerIntegra
     private void configureForAuthentication(final Settings settings) {
         final Settings.AuthenticationSettings authSettings = new Settings.AuthenticationSettings();
         authSettings.authenticator = SimpleAuthenticator.class.getName();
+        authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
 
         // use a credentials graph with two users in it: stephen/password and marko/rainbow-dash
         final Map<String,Object> authConfig = new HashMap<>();
@@ -147,7 +149,7 @@ public class GremlinServerHttpIntegrateTest extends AbstractGremlinServerIntegra
         authSettings.authenticator = SimpleAuthenticator.class.getName();
 
         //Add basic auth handler to make sure the reflection code path works.
-        authSettings.authenticationHandler = HttpBasicAuthenticationHandler.class.getName();
+        authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
 
         // use a credentials graph with two users in it: stephen/password and marko/rainbow-dash
         final Map<String,Object> authConfig = new HashMap<>();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
index 4f0922e..270bed2 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
@@ -40,6 +40,7 @@ import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine;
 import org.apache.tinkerpop.gremlin.groovy.jsr223.GroovyCompilerGremlinPlugin;
 import org.apache.tinkerpop.gremlin.groovy.jsr223.customizer.SimpleSandboxExtension;
 import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin;
+import org.apache.tinkerpop.gremlin.server.handler.UnifiedHandler;
 import org.apache.tinkerpop.gremlin.structure.RemoteGraph;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
@@ -58,7 +59,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -80,6 +80,7 @@ import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalS
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.AllOf.allOf;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.hamcrest.core.IsNot.not;
 import static org.hamcrest.core.StringStartsWith.startsWith;
@@ -115,9 +116,11 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
 
         if (name.getMethodName().equals("shouldPingChannelIfClientDies") ||
                 name.getMethodName().equals("shouldCloseChannelIfClientDoesntRespond")) {
-            final org.apache.log4j.Logger webSocketClientHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
-            previousLogLevel = webSocketClientHandlerLogger.getLevel();
-            webSocketClientHandlerLogger.setLevel(Level.INFO);
+            final org.apache.log4j.Logger opSelectorHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
+            final org.apache.log4j.Logger unifiedHandlerLogger = org.apache.log4j.Logger.getLogger(UnifiedHandler.class);
+            previousLogLevel = opSelectorHandlerLogger.getLevel();
+            opSelectorHandlerLogger.setLevel(Level.INFO);
+            unifiedHandlerLogger.setLevel(Level.INFO);
         }
 
         rootLogger.addAppender(recordingAppender);
@@ -129,8 +132,10 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
 
         if (name.getMethodName().equals("shouldPingChannelIfClientDies")||
                 name.getMethodName().equals("shouldCloseChannelIfClientDoesntRespond")) {
-            final org.apache.log4j.Logger webSocketClientHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
-            webSocketClientHandlerLogger.setLevel(previousLogLevel);
+            final org.apache.log4j.Logger opSelectorHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
+            opSelectorHandlerLogger.setLevel(previousLogLevel);
+            final org.apache.log4j.Logger unifiedHandlerLogger = org.apache.log4j.Logger.getLogger(UnifiedHandler.class);
+            unifiedHandlerLogger.setLevel(previousLogLevel);
         }
 
         rootLogger.removeAppender(recordingAppender);
@@ -145,6 +150,8 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
         switch (nameOfTest) {
             case "shouldProvideBetterExceptionForMethodCodeTooLarge":
                 settings.maxContentLength = 4096000;
+
+                // OpProcessor setting
                 final Settings.ProcessorSettings processorSettingsBig = new Settings.ProcessorSettings();
                 processorSettingsBig.className = StandardOpProcessor.class.getName();
                 processorSettingsBig.config = new HashMap<String,Object>() {{
@@ -152,6 +159,9 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
                 }};
                 settings.processors.clear();
                 settings.processors.add(processorSettingsBig);
+
+                // Unified setting
+                settings.maxParameters = Integer.MAX_VALUE;
                 break;
             case "shouldRespectHighWaterMarkSettingAndSucceed":
                 settings.writeBufferHighWaterMark = 64;
@@ -182,6 +192,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
                 settings.scriptEngines.get("gremlin-groovy").config = getScriptEngineConfForBaseScript();
                 break;
             case "shouldReturnInvalidRequestArgsWhenBindingCountExceedsAllowable":
+                // OpProcessor settings
                 final Settings.ProcessorSettings processorSettingsSmall = new Settings.ProcessorSettings();
                 processorSettingsSmall.className = StandardOpProcessor.class.getName();
                 processorSettingsSmall.config = new HashMap<String,Object>() {{
@@ -189,6 +200,9 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
                 }};
                 settings.processors.clear();
                 settings.processors.add(processorSettingsSmall);
+
+                // Unified settings
+                settings.maxParameters = 1;
                 break;
             case "shouldTimeOutRemoteTraversal":
                 settings.evaluationTimeout = 500;
@@ -699,7 +713,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
     public void shouldReceiveFailureTimeOutOnScriptEval() throws Exception {
         try (SimpleClient client = TestClientFactory.createWebSocketClient()){
             final List<ResponseMessage> responses = client.submit("Thread.sleep(3000);'some-stuff-that-should not return'");
-            assertThat(responses.get(0).getStatus().getMessage(), startsWith("Evaluation exceeded the configured 'evaluationTimeout' threshold of 1000 ms"));
+            assertThat(responses.get(0).getStatus().getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("1000 ms")));
 
             // validate that we can still send messages to the server
             assertEquals(2, ((List<Integer>) client.submit("1+1").get(0).getResult().getData()).get(0).intValue());
@@ -715,7 +729,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
                     .addArg(Tokens.ARGS_GREMLIN, "Thread.sleep(3000);'some-stuff-that-should not return'")
                     .create();
             final List<ResponseMessage> responses = client.submit(msg);
-            assertThat(responses.get(0).getStatus().getMessage(), startsWith("Evaluation exceeded the configured 'evaluationTimeout' threshold of 100 ms"));
+            assertThat(responses.get(0).getStatus().getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("100 ms")));
 
             // validate that we can still send messages to the server
             assertEquals(2, ((List<Integer>) client.submit("1+1").get(0).getResult().getData()).get(0).intValue());
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
index 3dc58ae..fe9ff99 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
@@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
 import org.apache.tinkerpop.gremlin.driver.simple.SimpleClient;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
 import org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor;
 import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
 import org.junit.After;
@@ -94,11 +95,16 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
             case "shouldHaveTheSessionTimeout":
             case "shouldCloseSessionOnceOnRequest":
                 settings.processors.clear();
+
+                // OpProcessor setting
                 final Settings.ProcessorSettings processorSettings = new Settings.ProcessorSettings();
                 processorSettings.className = SessionOpProcessor.class.getCanonicalName();
                 processorSettings.config = new HashMap<>();
                 processorSettings.config.put(SessionOpProcessor.CONFIG_SESSION_TIMEOUT, 3000L);
                 settings.processors.add(processorSettings);
+
+                // Unified setting
+                settings.sessionLifetimeTimeout = 3000L;
                 break;
             case "shouldCloseSessionOnClientClose":
                 clearNeo4j(settings);
@@ -106,13 +112,27 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
             case "shouldEnsureSessionBindingsAreThreadSafe":
                 settings.threadPoolWorker = 2;
                 break;
+            case "shouldUseGlobalFunctionCache":
+                // OpProcessor settings are good by default
+                // UnifiedHandler settings
+                settings.useCommonEngineForSessions = false;
+                settings.useGlobalFunctionCacheForSessions = true;
+
+                break;
             case "shouldNotUseGlobalFunctionCache":
                 settings.processors.clear();
+
+                // OpProcessor settings
                 final Settings.ProcessorSettings processorSettingsForDisableFunctionCache = new Settings.ProcessorSettings();
                 processorSettingsForDisableFunctionCache.className = SessionOpProcessor.class.getCanonicalName();
                 processorSettingsForDisableFunctionCache.config = new HashMap<>();
                 processorSettingsForDisableFunctionCache.config.put(SessionOpProcessor.CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED, false);
                 settings.processors.add(processorSettingsForDisableFunctionCache);
+
+                // UnifiedHandler settings
+                settings.useCommonEngineForSessions = false;
+                settings.useGlobalFunctionCacheForSessions = false;
+
                 break;
             case "shouldExecuteInSessionAndSessionlessWithoutOpeningTransactionWithSingleClient":
             case "shouldExecuteInSessionWithTransactionManagement":
@@ -124,6 +144,11 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
         return settings;
     }
 
+    private boolean isUsingUnifiedChannelizer() {
+        return server.getServerGremlinExecutor().
+                getSettings().channelizer.equals(UnifiedChannelizer.class.getName());
+    }
+
     private static void clearNeo4j(Settings settings) {
         deleteDirectory(new File("/tmp/neo4j"));
         settings.graphs.put("graph", "conf/neo4j-empty.properties");
@@ -140,8 +165,13 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
         client1.close();
         cluster1.close();
 
-        assertThat(recordingAppender.getMessages(), hasItem("INFO - Skipped attempt to close open graph transactions on shouldCloseSessionOnClientClose - close was forced\n"));
-        assertThat(recordingAppender.getMessages(), hasItem("INFO - Session shouldCloseSessionOnClientClose closed\n"));
+        // the following session close log message is no longer relevant as
+        if (isUsingUnifiedChannelizer()) {
+            assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
+        } else {
+            assertThat(recordingAppender.getMessages(), hasItem("INFO - Skipped attempt to close open graph transactions on shouldCloseSessionOnClientClose - close was forced\n"));
+            assertThat(recordingAppender.getMessages(), hasItem("INFO - Session shouldCloseSessionOnClientClose closed\n"));
+        }
 
         // try to reconnect to that session and make sure no state is there
         final Cluster clusterReconnect = TestClientFactory.open();
@@ -159,16 +189,27 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
         // the commit from client1 should not have gone through so there should be no data present.
         assertEquals(0, clientReconnect.submit("graph.traversal().V().count()").all().join().get(0).getInt());
         clusterReconnect.close();
+
+        if (isUsingUnifiedChannelizer()) {
+            assertEquals(0, ((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().getActiveSessionCount());
+        }
     }
 
     @Test
     public void shouldUseGlobalFunctionCache() throws Exception {
         final Cluster cluster = TestClientFactory.open();
-        final Client client = cluster.connect(name.getMethodName());
+        final Client session = cluster.connect(name.getMethodName());
+        final Client client = cluster.connect();
+
+        assertEquals(3, session.submit("def sumItUp(x,y){x+y};sumItUp(1,2)").all().get().get(0).getInt());
+        assertEquals(3, session.submit("sumItUp(1,2)").all().get().get(0).getInt());
 
         try {
-            assertEquals(3, client.submit("def addItUp(x,y){x+y};addItUp(1,2)").all().get().get(0).getInt());
-            assertEquals(3, client.submit("addItUp(1,2)").all().get().get(0).getInt());
+            client.submit("sumItUp(1,2)").all().get().get(0).getInt();
+            fail("Global functions should not be cached so the call to sumItUp() should fail");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root.getMessage(), startsWith("No signature of method"));
         } finally {
             cluster.close();
         }
@@ -180,14 +221,14 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
         final Client client = cluster.connect(name.getMethodName());
 
         try {
-            assertEquals(3, client.submit("def addItUp(x,y){x+y};addItUp(1,2)").all().get().get(0).getInt());
+            assertEquals(3, client.submit("def sumItUp(x,y){x+y};sumItUp(1,2)").all().get().get(0).getInt());
         } catch (Exception ex) {
             cluster.close();
             throw ex;
         }
 
         try {
-            client.submit("addItUp(1,2)").all().get().get(0).getInt();
+            client.submit("sumItUp(1,2)").all().get().get(0).getInt();
             fail("Global functions should not be cached so the call to addItUp() should fail");
         } catch (Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
@@ -273,8 +314,12 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
             cluster.close();
         }
 
-        assertEquals(1, recordingAppender.getMessages().stream()
-                .filter(msg -> msg.equals("INFO - Session shouldCloseSessionOnceOnRequest closed\n")).count());
+        if (isUsingUnifiedChannelizer()) {
+            assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
+        } else {
+            assertEquals(1, recordingAppender.getMessages().stream()
+                    .filter(msg -> msg.equals("INFO - Session shouldCloseSessionOnceOnRequest closed\n")).count());
+        }
     }
 
     @Test
@@ -308,9 +353,13 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
             cluster.close();
         }
 
-        // there will be one for the timeout and a second for closing the cluster
-        assertEquals(2, recordingAppender.getMessages().stream()
-                .filter(msg -> msg.equals("INFO - Session shouldHaveTheSessionTimeout closed\n")).count());
+        if (isUsingUnifiedChannelizer()) {
+            assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
+        } else {
+            // there will be one for the timeout and a second for closing the cluster
+            assertEquals(2, recordingAppender.getMessages().stream()
+                    .filter(msg -> msg.equals("INFO - Session shouldHaveTheSessionTimeout closed\n")).count());
+        }
     }
 
     @Test
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
index 166764b..f28969c 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
@@ -22,6 +22,7 @@ import org.apache.http.NoHttpResponseException;
 import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
 import org.apache.tinkerpop.gremlin.server.Settings;
 
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
 import org.junit.Test;
 import org.junit.Assert;
 
@@ -29,6 +30,9 @@ import java.util.Map;
 import java.util.HashMap;
 import java.net.SocketException;
 
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assume.assumeThat;
+
 public class HttpChannelizerIntegrateTest extends AbstractGremlinServerChannelizerIntegrateTest {
 
     @Override
@@ -74,6 +78,7 @@ public class HttpChannelizerIntegrateTest extends AbstractGremlinServerChanneliz
         final Settings.AuthenticationSettings authSettings = new Settings.AuthenticationSettings();
         final Map<String,Object> authConfig = new HashMap<>();
         authSettings.authenticator = SimpleAuthenticator.class.getName();
+        authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
         authConfig.put(SimpleAuthenticator.CONFIG_CREDENTIALS_DB, "conf/tinkergraph-credentials.properties");
         authSettings.config = authConfig;
 
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizerIntegrateTest.java
similarity index 58%
copy from gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
copy to gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizerIntegrateTest.java
index 166764b..a090a20 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizerIntegrateTest.java
@@ -18,55 +18,28 @@
  */
 package org.apache.tinkerpop.gremlin.server.channel;
 
-import org.apache.http.NoHttpResponseException;
-import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
 import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
 
-import org.junit.Test;
-import org.junit.Assert;
-
-import java.util.Map;
 import java.util.HashMap;
-import java.net.SocketException;
-
-public class HttpChannelizerIntegrateTest extends AbstractGremlinServerChannelizerIntegrateTest {
-
-    @Override
-    public Settings overrideSettings(final Settings settings) {
-        super.overrideSettings(settings);
-        final String nameOfTest = name.getMethodName();
-        if (nameOfTest.equals("shouldBreakOnInvalidAuthenticationHandler") ) {
-            settings.authentication = getAuthSettings();
-            settings.authentication.authenticationHandler = "Foo.class";
-        }
-        return settings;
-    }
+import java.util.Map;
 
-    @Test
-    public void shouldBreakOnInvalidAuthenticationHandler() throws Exception {
-        final CombinedTestClient client =  new CombinedTestClient(getProtocol());
-        try {
-            client.sendAndAssert("2+2", 4);
-            Assert.fail("An exception should be thrown with an invalid authentication handler");
-        } catch (NoHttpResponseException | SocketException e) {
-        } finally {
-            client.close();
-        }
-    }
+public class UnifiedChannelizerIntegrateTest extends AbstractGremlinServerChannelizerIntegrateTest {
 
     @Override
     public String getProtocol() {
-        return HTTP;
+        return WS_AND_HTTP;
     }
 
     @Override
     public String getSecureProtocol() {
-        return HTTPS;
+        return WSS_AND_HTTPS;
     }
 
     @Override
     public String getChannelizer() {
-        return HttpChannelizer.class.getName();
+        return UnifiedChannelizer.class.getName();
     }
 
     @Override
@@ -74,6 +47,7 @@ public class HttpChannelizerIntegrateTest extends AbstractGremlinServerChanneliz
         final Settings.AuthenticationSettings authSettings = new Settings.AuthenticationSettings();
         final Map<String,Object> authConfig = new HashMap<>();
         authSettings.authenticator = SimpleAuthenticator.class.getName();
+        authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
         authConfig.put(SimpleAuthenticator.CONFIG_CREDENTIALS_DB, "conf/tinkergraph-credentials.properties");
         authSettings.config = authConfig;
 
diff --git a/gremlin-tools/gremlin-benchmark/pom.xml b/gremlin-tools/gremlin-benchmark/pom.xml
index 688f44f..d0cdc0e 100644
--- a/gremlin-tools/gremlin-benchmark/pom.xml
+++ b/gremlin-tools/gremlin-benchmark/pom.xml
@@ -65,11 +65,6 @@ limitations under the License.
             <version>${project.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.tinkerpop</groupId>
-            <artifactId>tinkergraph-gremlin</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.openjdk.jmh</groupId>
             <artifactId>jmh-core</artifactId>
             <version>${jmh.version}</version>
diff --git a/hadoop-gremlin/pom.xml b/hadoop-gremlin/pom.xml
index 3e50091..d04d2e1 100644
--- a/hadoop-gremlin/pom.xml
+++ b/hadoop-gremlin/pom.xml
@@ -143,11 +143,6 @@ limitations under the License.
             <version>1.2</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-compress</artifactId>
-            <version>1.19</version>
-        </dependency>
-        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
             <version>16.0.1</version>