You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2021/03/29 13:20:44 UTC

[tinkerpop] branch TINKERPOP-2245 updated (8ac6b21 -> 5de513d)

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a change to branch TINKERPOP-2245
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git.


 discard 8ac6b21  TINKERPOP-2245 Added UnifiedChannelizer
     new 5de513d  TINKERPOP-2245 Added UnifiedChannelizer

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (8ac6b21)
            \
             N -- N -- N   refs/heads/TINKERPOP-2245 (5de513d)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .travis.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

[tinkerpop] 01/01: TINKERPOP-2245 Added UnifiedChannelizer

Posted by sp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2245
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 5de513d9f6859ad633c69ca51162dfbd4e4c60d3
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Thu Mar 25 15:30:25 2021 -0400

    TINKERPOP-2245 Added UnifiedChannelizer
    
    The UnifiedChannelizer consolidates gremlin execution threadpools and unifies/streamlines server request processing between sessionless and sessionful requests.
---
 .travis.yml                                        |   4 +
 CHANGELOG.asciidoc                                 |   1 +
 docs/src/upgrade/release-3.5.x.asciidoc            |  11 +
 .../apache/tinkerpop/gremlin/driver/Handler.java   |   1 -
 .../gremlin/server/AbstractChannelizer.java        |  29 +-
 .../apache/tinkerpop/gremlin/server/Context.java   |  51 ++
 .../tinkerpop/gremlin/server/GremlinServer.java    |  13 +-
 .../apache/tinkerpop/gremlin/server/Settings.java  |  32 +
 .../gremlin/server/channel/HttpChannelizer.java    |   4 +-
 .../gremlin/server/channel/UnifiedChannelizer.java |  77 +++
 .../server/channel/WebSocketChannelizer.java       |  12 +-
 .../handler/AbstractAuthenticationHandler.java     |  12 +-
 .../gremlin/server/handler/AbstractRexster.java    | 693 +++++++++++++++++++++
 .../handler/HttpBasicAuthenticationHandler.java    |  11 +-
 .../gremlin/server/handler/MultiRexster.java       | 201 ++++++
 .../tinkerpop/gremlin/server/handler/Rexster.java  |  84 +++
 .../SaslAndHttpBasicAuthenticationHandler.java     |  25 +-
 .../server/handler/SaslAuthenticationHandler.java  |  11 +-
 .../gremlin/server/handler/SingleRexster.java      |  68 ++
 .../gremlin/server/handler/UnifiedHandler.java     | 283 +++++++++
 .../handler/WsAndHttpChannelizerHandler.java       |   6 +-
 .../AbstractGremlinServerIntegrationTest.java      |  31 +-
 .../tinkerpop/gremlin/server/ContextTest.java      |   3 +-
 .../gremlin/server/GremlinDriverIntegrateTest.java |  68 +-
 ...emlinServerAuditLogDeprecatedIntegrateTest.java |   5 +-
 .../server/GremlinServerAuditLogIntegrateTest.java |   4 +-
 .../server/GremlinServerAuthKrb5IntegrateTest.java |   2 +-
 .../server/GremlinServerAuthzIntegrateTest.java    |   3 +
 .../server/GremlinServerHttpIntegrateTest.java     |   4 +-
 .../gremlin/server/GremlinServerIntegrateTest.java |  30 +-
 .../server/GremlinServerSessionIntegrateTest.java  |  73 ++-
 .../channel/HttpChannelizerIntegrateTest.java      |   5 +
 ...t.java => UnifiedChannelizerIntegrateTest.java} |  42 +-
 gremlin-tools/gremlin-benchmark/pom.xml            |   5 -
 hadoop-gremlin/pom.xml                             |   5 -
 35 files changed, 1790 insertions(+), 119 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index a1456e7..ff3a058 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -60,6 +60,10 @@ jobs:
       name: "gremlin server"
     - script:
         - "mvn clean install -q -DskipTests -Dci"
+        - "travis_wait 60 mvn verify -pl :gremlin-server -DskipTests -DskipIntegrationTests=false -DincludeNeo4j -DtestUnified=true"
+      name: "gremlin server - unified"
+    - script:
+        - "mvn clean install -q -DskipTests -Dci"
         - "mvn verify -pl :gremlin-console -DskipTests -DskipIntegrationTests=false"
       name: "gremlin console"
     - script:
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 9bb33a8..7f67e9b 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -28,6 +28,7 @@ This release also includes changes from <<release-3-4-3, 3.4.3>>.
 * Allowed the possibility for the propagation of `null` as a `Traverser` in Gremlin.
 * Exposed websocket connection status in JavaScript driver.
 * Fixed a bug where spark-gremlin was not re-attaching properties when using `dedup()`.
+* Fixed a bug in `WsAndHttpChannelizer` pipeline configuration where failed object aggregation could not write back HTTP responses.
 * Ensured better consistency of the use of `null` as arguments to mutation steps.
 * Added a `ResponseStatusCode` to indicate that a client should retry its request.
 * Added `TemporaryException` interface to indicate that a transaction can be retried.
diff --git a/docs/src/upgrade/release-3.5.x.asciidoc b/docs/src/upgrade/release-3.5.x.asciidoc
index 45b4769..5504a05 100644
--- a/docs/src/upgrade/release-3.5.x.asciidoc
+++ b/docs/src/upgrade/release-3.5.x.asciidoc
@@ -390,6 +390,17 @@ these values are not hashable and will result in an error. By introducing a `Has
 See: link:https://issues.apache.org/jira/browse/TINKERPOP-2395[TINKERPOP-2395],
 link:https://issues.apache.org/jira/browse/TINKERPOP-2407[TINKERPOP-2407]
 
+==== Gremlin Server UnifiedChannelizer
+
+Just some notes for later:
+
+* UnifiedChannelizer technically replaces all existing implementations but is not yet the default
+* Some new settings related to it: maxParameters, sessionLifeTimeout, useGlobalFunctionCacheForSessions, useCommonEngineForSessions
+* Session behavior shifts slightly under this channelizer for async calls, where a failure will mean that the session
+will close, remaining requests in the queue will be ignored and rollback will occur.
+* care should be take with strict transaction management and multi-graph transactions (which aren't real - not a new thing)
+* absolute max lifetime of a session is a new thing
+
 ==== Gremlin Server Audit Logging
 
 The `authentication.enableAuditlog` configuration property is deprecated, but replaced by the `enableAuditLog` property
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index aa15597..11bf8b4 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -222,7 +222,6 @@ final class Handler {
             final ResultQueue queue = pending.get(response.getRequestId());
             if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.PARTIAL_CONTENT) {
                 final Object data = response.getResult().getData();
-                final Map<String,Object> meta = response.getResult().getMeta();
 
                 // this is a "result" from the server which is either the result of a script or a
                 // serialized traversal
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java
index fe3c9e8..1b64465 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java
@@ -50,6 +50,7 @@ import javax.net.ssl.TrustManagerFactory;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.Constructor;
 import java.security.KeyStore;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
@@ -98,6 +99,7 @@ public abstract class AbstractChannelizer extends ChannelInitializer<SocketChann
     public static final String PIPELINE_AUTHORIZER = "authorizer";
     public static final String PIPELINE_REQUEST_HANDLER = "request-handler";
     public static final String PIPELINE_HTTP_RESPONSE_ENCODER = "http-response-encoder";
+    public static final String PIPELINE_HTTP_AGGREGATOR = "http-aggregator";
     public static final String PIPELINE_WEBSOCKET_SERVER_COMPRESSION = "web-socket-server-compression-handler";
 
     protected static final String PIPELINE_SSL = "ssl";
@@ -182,10 +184,29 @@ public abstract class AbstractChannelizer extends ChannelInitializer<SocketChann
     protected AbstractAuthenticationHandler createAuthenticationHandler(final Settings settings) {
         try {
             final Class<?> clazz = Class.forName(settings.authentication.authenticationHandler);
-            final Class[] constructorArgs = new Class[2];
-            constructorArgs[0] = Authenticator.class;
-            constructorArgs[1] = Settings.class;
-            return (AbstractAuthenticationHandler) clazz.getDeclaredConstructor(constructorArgs).newInstance(authenticator, settings);
+            AbstractAuthenticationHandler aah;
+            try {
+                // the three arg constructor is the new form as a handler may need the authorizer in some cases
+                final Class<?>[] threeArgForm = new Class[]{Authenticator.class, Authorizer.class, Settings.class};
+                final Constructor<?> twoArgConstructor = clazz.getDeclaredConstructor(threeArgForm);
+                return (AbstractAuthenticationHandler) twoArgConstructor.newInstance(authenticator, authorizer, settings);
+            } catch (Exception threeArgEx) {
+                try {
+                    // the two arg constructor is the "old form" that existed prior to Authorizers. should probably
+                    // deprecate this form
+                    final Class<?>[] twoArgForm = new Class[]{Authenticator.class, Settings.class};
+                    final Constructor<?> twoArgConstructor = clazz.getDeclaredConstructor(twoArgForm);
+
+                    if (authorizer != null) {
+                        logger.warn("There is an authorizer configured but the {} does not have a constructor of ({}, {}, {}) so it cannot be added",
+                                clazz.getName(), Authenticator.class.getSimpleName(), Authorizer.class.getSimpleName(), Settings.class.getSimpleName());
+                    }
+
+                    return (AbstractAuthenticationHandler) twoArgConstructor.newInstance(authenticator, settings);
+                } catch (Exception twoArgEx) {
+                    throw twoArgEx;
+                }
+            }
         } catch (Exception ex) {
             logger.warn(ex.getMessage());
             throw new IllegalStateException(String.format("Could not create/configure AuthenticationHandler %s", settings.authentication.authenticationHandler), ex);
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
index 902a788..fcd2072 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
@@ -18,16 +18,21 @@
  */
 package org.apache.tinkerpop.gremlin.server;
 
+import org.apache.tinkerpop.gremlin.driver.Tokens;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
 import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
 import io.netty.channel.ChannelHandlerContext;
+import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinScriptChecker;
+import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
 import org.apache.tinkerpop.gremlin.server.handler.Frame;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -45,6 +50,11 @@ public class Context {
     private final GremlinExecutor gremlinExecutor;
     private final ScheduledExecutorService scheduledExecutorService;
     private final AtomicBoolean finalResponseWritten = new AtomicBoolean();
+    private final long requestTimeout;
+    private final RequestContentType requestContentType;
+    private final Object gremlinArgument;
+
+    public enum RequestContentType { BYTECODE, SCRIPT, UNKNOWN }
 
     public Context(final RequestMessage requestMessage, final ChannelHandlerContext ctx,
                    final Settings settings, final GraphManager graphManager,
@@ -55,6 +65,23 @@ public class Context {
         this.graphManager = graphManager;
         this.gremlinExecutor = gremlinExecutor;
         this.scheduledExecutorService = scheduledExecutorService;
+
+        // order of calls matter as one depends on the next
+        this.gremlinArgument = requestMessage.getArgs().get(Tokens.ARGS_GREMLIN);
+        this.requestContentType = determineRequestContents();
+        this.requestTimeout = determineTimeout();
+    }
+
+    public long getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    public RequestContentType getRequestContentType() {
+        return requestContentType;
+    }
+
+    public Object getGremlinArgument() {
+        return gremlinArgument;
     }
 
     public ScheduledExecutorService getScheduledExecutorService() {
@@ -133,4 +160,28 @@ public class Context {
         }
 
     }
+
+    private RequestContentType determineRequestContents() {
+        if (gremlinArgument instanceof Bytecode)
+            return RequestContentType.BYTECODE;
+        else if (gremlinArgument instanceof String)
+            return RequestContentType.SCRIPT;
+        else
+            return RequestContentType.UNKNOWN;
+    }
+
+    private long determineTimeout() {
+        // timeout override - handle both deprecated and newly named configuration. earlier logic should prevent
+        // both configurations from being submitted at the same time
+        final Map<String, Object> args = requestMessage.getArgs();
+        final long seto = args.containsKey(Tokens.ARGS_EVAL_TIMEOUT) ?
+                ((Number) args.get(Tokens.ARGS_EVAL_TIMEOUT)).longValue() : settings.getEvaluationTimeout();
+
+        // override the timeout if the lifecycle has a value assigned. if the script contains with(timeout)
+        // options then allow that value to override what's provided on the lifecycle
+        final Optional<Long> timeoutDefinedInScript = requestContentType == RequestContentType.SCRIPT ?
+                GremlinScriptChecker.parse(gremlinArgument.toString()).getTimeout() : Optional.empty();
+
+        return timeoutDefinedInScript.orElse(seto);
+    }
 }
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
index 2a1adf7..b322aa7 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
@@ -78,6 +78,7 @@ public class GremlinServer {
     private final ExecutorService gremlinExecutorService;
     private final ServerGremlinExecutor serverGremlinExecutor;
     private final boolean isEpollEnabled;
+    private Channelizer channelizer;
 
     /**
      * Construct a Gremlin Server instance from {@link Settings}.
@@ -159,7 +160,7 @@ public class GremlinServer {
                 }
             });
 
-            final Channelizer channelizer = createChannelizer(settings);
+            channelizer = createChannelizer(settings);
             channelizer.init(serverGremlinExecutor);
             b.group(bossGroup, workerGroup)
                     .childHandler(channelizer);
@@ -284,8 +285,10 @@ public class GremlinServer {
             }
 
             try {
-                if (gremlinExecutorService != null)
-                    gremlinExecutorService.awaitTermination(30000, TimeUnit.MILLISECONDS);
+                if (gremlinExecutorService != null) {
+                    if (!gremlinExecutorService.awaitTermination(30000, TimeUnit.MILLISECONDS))
+                        logger.warn("Gremlin thread pool did not fully terminate - continuing with shutdown process");
+                }
             } catch (InterruptedException ie) {
                 logger.warn("Timeout waiting for Gremlin thread pool to shutdown - continuing with shutdown process.");
             }
@@ -330,6 +333,10 @@ public class GremlinServer {
         return serverGremlinExecutor;
     }
 
+    public Channelizer getChannelizer() {
+        return channelizer;
+    }
+
     public static void main(final String[] args) throws Exception {
         // add to vm options: -Dlog4j.configuration=file:conf/log4j.properties
         printHeader();
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
index 3a535ab..3f84c19 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
 import org.apache.tinkerpop.gremlin.server.auth.AllowAllAuthenticator;
 import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
 import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
 import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
 import org.apache.tinkerpop.gremlin.server.handler.AbstractAuthenticationHandler;
 import org.apache.tinkerpop.gremlin.server.util.DefaultGraphManager;
@@ -197,6 +198,37 @@ public class Settings {
     public String graphManager = DefaultGraphManager.class.getName();
 
     /**
+     * Maximum number of parameters that can be passed on a request. Larger numbers may impact performance for scripts.
+     * The default is 16 and this setting only applies to the {@link UnifiedChannelizer}.
+     */
+    public int maxParameters = 16;
+
+    /**
+     * The time in milliseconds that a {@link UnifiedChannelizer} session can exist. This value cannot be extended
+     * beyond this value irrespective of the number of requests and their individual timeouts. Requests must complete
+     * within this time frame. The default is 10 minutes.
+     */
+    public long sessionLifetimeTimeout = 600000;
+
+    /**
+     * Enable the global function cache for sessions when using the {@link UnifiedChannelizer}. This setting is only
+     * relevant when {@link #useGlobalFunctionCacheForSessions} is {@code false}. When {@link true} it means that
+     * functions created in one request to a session remain available on the next request to that session.
+     */
+    public boolean useGlobalFunctionCacheForSessions = true;
+
+    /**
+     * When {@code true} and using the {@link UnifiedChannelizer} the same engine that will be used to server
+     * sessionless requests will also be use to serve sessions. The default value of {@code true} is recommended as
+     * it reduces the amount of object creation required for each session and should generally lead to better
+     * performance especially if the expectation is that there will be many sessions being created and destroyed
+     * rapidly. Setting this value to {@code false} is mostly present to support specific use cases that may require
+     * each session having its own engine or to match previous functionality provided by the older channelizers
+     * produced prior to 3.5.0.
+     */
+    public boolean useCommonEngineForSessions = true;
+
+    /**
      * Configured metrics for Gremlin Server.
      */
     public ServerMetrics metrics = null;
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
index 7b3ad99..be2a7af 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
@@ -62,7 +62,9 @@ public class HttpChannelizer extends AbstractChannelizer {
         if (logger.isDebugEnabled())
             pipeline.addLast(new LoggingHandler("http-io", LogLevel.DEBUG));
 
-        pipeline.addLast(new HttpObjectAggregator(settings.maxContentLength));
+        final HttpObjectAggregator aggregator = new HttpObjectAggregator(settings.maxContentLength);
+        aggregator.setMaxCumulationBufferComponents(settings.maxAccumulationBufferComponents);
+        pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, aggregator);
 
         if (authenticator != null) {
             // Cannot add the same handler instance multiple times unless
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizer.java
new file mode 100644
index 0000000..522baa2
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.channel;
+
+import io.netty.channel.ChannelPipeline;
+import org.apache.tinkerpop.gremlin.server.AbstractChannelizer;
+import org.apache.tinkerpop.gremlin.server.Channelizer;
+import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler;
+import org.apache.tinkerpop.gremlin.server.handler.UnifiedHandler;
+import org.apache.tinkerpop.gremlin.server.handler.WsAndHttpChannelizerHandler;
+import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
+
+/**
+ * A {@link Channelizer} that supports websocket and HTTP requests and does so with the most streamlined processing
+ * model for Gremlin Server introduced with 3.5.0.
+ */
+public class UnifiedChannelizer extends AbstractChannelizer {
+
+    private WsAndHttpChannelizerHandler handler;
+    private UnifiedHandler unifiedHandler;
+    protected static final String PIPELINE_UNIFIED = "unified";
+
+    @Override
+    public void init(final ServerGremlinExecutor serverGremlinExecutor) {
+        super.init(serverGremlinExecutor);
+        handler = new WsAndHttpChannelizerHandler();
+        handler.init(serverGremlinExecutor, new HttpGremlinEndpointHandler(serializers, gremlinExecutor, graphManager, settings));
+
+        // these handlers don't share any state and can thus be initialized once per pipeline
+        unifiedHandler = new UnifiedHandler(settings, graphManager, gremlinExecutor, scheduledExecutorService, this);
+    }
+
+    @Override
+    public void configure(final ChannelPipeline pipeline) {
+        handler.configure(pipeline);
+        pipeline.addAfter(PIPELINE_HTTP_REQUEST_DECODER, "WsAndHttpChannelizerHandler", handler);
+    }
+
+    @Override
+    public void finalize(final ChannelPipeline pipeline) {
+        super.finalize(pipeline);
+        pipeline.remove(PIPELINE_OP_SELECTOR);
+        pipeline.remove(PIPELINE_OP_EXECUTOR);
+
+        pipeline.addLast(PIPELINE_UNIFIED, unifiedHandler);
+    }
+
+    public UnifiedHandler getUnifiedHandler() {
+        return unifiedHandler;
+    }
+
+    @Override
+    public boolean supportsIdleMonitor() {
+        return true;
+    }
+
+    @Override
+    public Object createIdleDetectionMessage() {
+        return handler.getWsChannelizer().createIdleDetectionMessage();
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
index c4ff402..a1864fd 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
@@ -81,8 +81,11 @@ public class WebSocketChannelizer extends AbstractChannelizer {
 
     @Override
     public void configure(final ChannelPipeline pipeline) {
+
         if (logger.isDebugEnabled())
-            pipeline.addLast(new LoggingHandler("log-io", LogLevel.DEBUG));
+            pipeline.addLast(new LoggingHandler("log-encoder-aggregator", LogLevel.DEBUG));
+
+        pipeline.addLast(PIPELINE_HTTP_RESPONSE_ENCODER, new HttpResponseEncoder());
 
         logger.debug("HttpRequestDecoder settings - maxInitialLineLength={}, maxHeaderSize={}, maxChunkSize={}",
                 settings.maxInitialLineLength, settings.maxHeaderSize, settings.maxChunkSize);
@@ -95,12 +98,7 @@ public class WebSocketChannelizer extends AbstractChannelizer {
                 settings.maxContentLength, settings.maxAccumulationBufferComponents);
         final HttpObjectAggregator aggregator = new HttpObjectAggregator(settings.maxContentLength);
         aggregator.setMaxCumulationBufferComponents(settings.maxAccumulationBufferComponents);
-        pipeline.addLast("aggregator", aggregator);
-
-        if (logger.isDebugEnabled())
-            pipeline.addLast(new LoggingHandler("log-aggregator-encoder", LogLevel.DEBUG));
-
-        pipeline.addLast(PIPELINE_HTTP_RESPONSE_ENCODER, new HttpResponseEncoder());
+        pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, aggregator);
         // Add compression extension for WebSocket defined in https://tools.ietf.org/html/rfc7692
         pipeline.addLast(PIPELINE_WEBSOCKET_SERVER_COMPRESSION, new WebSocketServerCompressionHandler());
 
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractAuthenticationHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractAuthenticationHandler.java
index 026ad59..074e4ab 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractAuthenticationHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractAuthenticationHandler.java
@@ -21,15 +21,25 @@ package org.apache.tinkerpop.gremlin.server.handler;
 import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
 
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
 
 /**
  * Provides an abstraction point to allow for http auth schemes beyond basic auth.
  */
 public abstract class AbstractAuthenticationHandler extends ChannelInboundHandlerAdapter {
     protected final Authenticator authenticator;
+    protected final Authorizer authorizer;
 
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #AbstractAuthenticationHandler(Authenticator, Authorizer)}.
+     */
+    @Deprecated
     public AbstractAuthenticationHandler(final Authenticator authenticator) {
-        this.authenticator = authenticator;
+        this(authenticator, null);
     }
 
+    public AbstractAuthenticationHandler(final Authenticator authenticator, final Authorizer authorizer) {
+        this.authenticator = authenticator;
+        this.authorizer = authorizer;
+    }
 }
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java
new file mode 100644
index 0000000..118e852
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java
@@ -0,0 +1,693 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import groovy.lang.GroovyRuntimeException;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
+import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer;
+import org.apache.tinkerpop.gremlin.groovy.jsr223.TimedInterruptTimeoutException;
+import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptEngine;
+import org.apache.tinkerpop.gremlin.jsr223.JavaTranslator;
+import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
+import org.apache.tinkerpop.gremlin.process.traversal.util.BytecodeHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
+import org.apache.tinkerpop.gremlin.server.Context;
+import org.apache.tinkerpop.gremlin.server.GraphManager;
+import org.apache.tinkerpop.gremlin.server.GremlinServer;
+import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
+import org.apache.tinkerpop.gremlin.server.util.ExceptionHelper;
+import org.apache.tinkerpop.gremlin.server.util.TraverserIterator;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.apache.tinkerpop.gremlin.structure.util.TemporaryException;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.codehaus.groovy.control.MultipleCompilationErrorsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.script.Bindings;
+import javax.script.ScriptException;
+import javax.script.SimpleBindings;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+/**
+ * A base implementation of {@link Rexster} which offers some common functionality that matches typical Gremlin Server
+ * request response expectations for script, bytecode and graph operations. The class is designed to be extended but
+ * take care in understanding the way that different methods are called as they do depend on one another a bit. It
+ * maybe best to examine the source code to determine how best to use this class or to extend from the higher order
+ * classes of {@link SingleRexster} or {@link MultiRexster}.
+ */
+public abstract class AbstractRexster implements Rexster, AutoCloseable {
+    private static final Logger logger = LoggerFactory.getLogger(AbstractRexster.class);
+    private static final Logger auditLogger = LoggerFactory.getLogger(GremlinServer.AUDIT_LOGGER_NAME);
+
+    private final Channel initialChannel;
+    private final boolean transactionManaged;
+    private final String sessionId;
+    private final AtomicReference<ScheduledFuture<?>> sessionCancelFuture = new AtomicReference<>();
+    private final AtomicReference<Future<?>> sessionFuture = new AtomicReference<>();
+    private long actualTimeoutLength = 0;
+    private boolean actualTimeoutCausedBySession = false;
+    protected final GraphManager graphManager;
+    protected final ConcurrentMap<String, Rexster> sessions;
+    protected final Set<String> aliasesUsedByRexster = new HashSet<>();
+
+    AbstractRexster(final Context gremlinContext, final String sessionId,
+                    final boolean transactionManaged, final ConcurrentMap<String, Rexster> sessions) {
+        this.transactionManaged = transactionManaged;
+        this.sessionId = sessionId;
+        this.initialChannel = gremlinContext.getChannelHandlerContext().channel();
+
+        // close Rexster if the channel closes to cleanup and close transactions
+        this.initialChannel.closeFuture().addListener(f -> {
+            // cancel session worker or it will keep waiting for items to appear in the session queue
+            final Future<?> sf = sessionFuture.get();
+            if (sf != null && !sf.isDone()) {
+                sf.cancel(true);
+            }
+            close();
+        });
+        this.sessions = sessions;
+        this.graphManager = gremlinContext.getGraphManager();
+    }
+
+    public boolean isTransactionManaged() {
+        return transactionManaged;
+    }
+
+    public String getSessionId() {
+        return sessionId;
+    }
+
+    public boolean isBoundTo(final Channel channel) {
+        return channel == initialChannel;
+    }
+
+    public long getActualTimeoutLength() {
+        return actualTimeoutLength;
+    }
+
+    public boolean isActualTimeoutCausedBySession() {
+        return actualTimeoutCausedBySession;
+    }
+
+    public GremlinScriptEngine getScriptEngine(final Context context, final String language) {
+        return context.getGremlinExecutor().getScriptEngineManager().getEngineByName(language);
+    }
+
+    @Override
+    public void setSessionCancelFuture(final ScheduledFuture<?> f) {
+        if (!sessionCancelFuture.compareAndSet(null, f))
+            throw new IllegalStateException("Session cancellation future is already set");
+    }
+
+    @Override
+    public void setSessionFuture(final Future<?> f) {
+        if (!sessionFuture.compareAndSet(null, f))
+            throw new IllegalStateException("Session future is already set");
+    }
+
+    @Override
+    public synchronized void triggerTimeout(final long timeout, final boolean causedBySession) {
+        // triggering timeout triggers the stop of the Rexster Runnable which will end in close()
+        // for final cleanup
+        final Future<?> f = sessionFuture.get();
+        if (f != null && !f.isDone()) {
+            actualTimeoutCausedBySession = causedBySession;
+            actualTimeoutLength = timeout;
+            sessionFuture.get().cancel(true);
+        }
+    }
+
+    protected void process(final Context gremlinContext) throws RexsterException {
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+        final Map<String, Object> args = msg.getArgs();
+        final Object gremlinToExecute = args.get(Tokens.ARGS_GREMLIN);
+
+        // for strict transactions track the aliases used so that we can commit them and only them on close()
+        if (gremlinContext.getSettings().strictTransactionManagement)
+            msg.optionalArgs(Tokens.ARGS_ALIASES).ifPresent(m -> aliasesUsedByRexster.addAll(((Map<String,String>) m).values()));
+
+        try {
+            // itty is optional as Bytecode could be a "graph operation" rather than a Traversal. graph operations
+            // don't need to be iterated and handle their own lifecycle
+            final Optional<Iterator<?>> itty = gremlinToExecute instanceof Bytecode ?
+                    fromBytecode(gremlinContext, (Bytecode) gremlinToExecute) :
+                    Optional.of(fromScript(gremlinContext, (String) gremlinToExecute));
+
+            processAuditLog(gremlinContext.getSettings(), gremlinContext.getChannelHandlerContext(), gremlinToExecute);
+
+            if (itty.isPresent())
+                handleIterator(gremlinContext, itty.get());
+        } catch (Exception ex) {
+            handleException(gremlinContext, ex);
+        }
+    }
+
+    protected void handleException(final Context gremlinContext, final Throwable t) throws RexsterException {
+        if (t instanceof RexsterException) throw (RexsterException) t;
+
+        final Optional<Throwable> possibleTemporaryException = determineIfTemporaryException(t);
+        if (possibleTemporaryException.isPresent()) {
+            final Throwable temporaryException = possibleTemporaryException.get();
+            throw new RexsterException(temporaryException.getMessage(), t,
+                    ResponseMessage.build(gremlinContext.getRequestMessage())
+                            .code(ResponseStatusCode.SERVER_ERROR_TEMPORARY)
+                            .statusMessage(temporaryException.getMessage())
+                            .statusAttributeException(temporaryException).create());
+        }
+
+        final Throwable root = ExceptionUtils.getRootCause(t);
+
+        if (root instanceof TimedInterruptTimeoutException) {
+            // occurs when the TimedInterruptCustomizerProvider is in play
+            final String msg = String.format("A timeout occurred within the script during evaluation of [%s] - consider increasing the limit given to TimedInterruptCustomizerProvider",
+                    gremlinContext.getRequestMessage().getRequestId());
+            throw new RexsterException(msg, root, ResponseMessage.build(gremlinContext.getRequestMessage())
+                    .code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+                    .statusMessage("Timeout during script evaluation triggered by TimedInterruptCustomizerProvider")
+                    .create());
+        }
+
+        if (root instanceof TimeoutException) {
+            final String errorMessage = String.format("Script evaluation exceeded the configured threshold for request [%s]",
+                    gremlinContext.getRequestMessage().getRequestId());
+            throw new RexsterException(errorMessage, root, ResponseMessage.build(gremlinContext.getRequestMessage())
+                    .code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+                    .statusMessage(t.getMessage())
+                    .create());
+        }
+
+        if (root instanceof InterruptedException ||
+                root instanceof TraversalInterruptedException ||
+                root instanceof InterruptedIOException) {
+            final String msg = actualTimeoutCausedBySession ?
+                    String.format("Session closed - sessionLifetimeTimeout of %s ms exceeded", actualTimeoutLength) :
+                    String.format("Evaluation exceeded timeout threshold of %s ms", actualTimeoutLength);
+            throw new RexsterException(msg, root, ResponseMessage.build(gremlinContext.getRequestMessage())
+                    .code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+                    .statusMessage(msg).create());
+        }
+
+        if (root instanceof MultipleCompilationErrorsException && root.getMessage().contains("Method too large") &&
+                ((MultipleCompilationErrorsException) root).getErrorCollector().getErrorCount() == 1) {
+            final String errorMessage = String.format("The Gremlin statement that was submitted exceeds the maximum compilation size allowed by the JVM, please split it into multiple smaller statements - %s", trimMessage(gremlinContext.getRequestMessage()));
+            logger.warn(errorMessage);
+            throw new RexsterException(errorMessage, root, ResponseMessage.build(gremlinContext.getRequestMessage())
+                    .code(ResponseStatusCode.SERVER_ERROR_EVALUATION)
+                    .statusMessage(errorMessage)
+                    .statusAttributeException(root).create());
+        }
+
+        // GroovyRuntimeException will hit a pretty wide range of eval type errors, like MissingPropertyException,
+        // CompilationFailedException, MissingMethodException, etc. If more specific handling is required then
+        // try to catch it earlier above.
+        if (root instanceof GroovyRuntimeException ||
+                root instanceof VerificationException ||
+                root instanceof ScriptException) {
+            throw new RexsterException(root.getMessage(), root, ResponseMessage.build(gremlinContext.getRequestMessage())
+                    .code(ResponseStatusCode.SERVER_ERROR_EVALUATION)
+                    .statusMessage(root.getMessage())
+                    .statusAttributeException(root).create());
+        }
+
+        throw new RexsterException(root.getClass().getSimpleName() + ": " + root.getMessage(), root,
+                ResponseMessage.build(gremlinContext.getRequestMessage())
+                        .code(ResponseStatusCode.SERVER_ERROR)
+                        .statusAttributeException(root)
+                        .statusMessage(root.getMessage()).create());
+    }
+
+    /**
+     * Used to decrease the size of a Gremlin script that triggered a "method too large" exception so that it
+     * doesn't log a massive text string nor return a large error message.
+     */
+    private RequestMessage trimMessage(final RequestMessage msg) {
+        final RequestMessage trimmedMsg = RequestMessage.from(msg).create();
+        if (trimmedMsg.getArgs().containsKey(Tokens.ARGS_GREMLIN))
+            trimmedMsg.getArgs().put(Tokens.ARGS_GREMLIN, trimmedMsg.getArgs().get(Tokens.ARGS_GREMLIN).toString().substring(0, 1021) + "...");
+
+        return trimmedMsg;
+    }
+
+    /**
+     * Check if any exception in the chain is TemporaryException then we should respond with the right error code so
+     * that the client knows to retry.
+     */
+    protected Optional<Throwable> determineIfTemporaryException(final Throwable ex) {
+        return Stream.of(ExceptionUtils.getThrowables(ex)).
+                filter(i -> i instanceof TemporaryException).findFirst();
+    }
+
+    @Override
+    public synchronized void close() {
+        // already closing/closed
+        if (!sessions.containsKey(sessionId)) return;
+
+        sessions.remove(sessionId);
+
+        if (sessionCancelFuture.get() != null) {
+            final ScheduledFuture<?> f = sessionCancelFuture.get();
+            if (!f.isDone()) f.cancel(true);
+        }
+    }
+
+    protected Iterator<?> fromScript(final Context gremlinContext, final String script) throws Exception {
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+        final Map<String, Object> args = msg.getArgs();
+        final String language = args.containsKey(Tokens.ARGS_LANGUAGE) ? (String) args.get(Tokens.ARGS_LANGUAGE) : "gremlin-groovy";
+        return IteratorUtils.asIterator(getScriptEngine(gremlinContext, language).eval(
+                script, mergeBindingsFromRequest(gremlinContext, getWorkerBindings())));
+    }
+
+    protected Optional<Iterator<?>> fromBytecode(final Context gremlinContext, final Bytecode bytecode) throws Exception {
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+
+        final Traversal.Admin<?, ?> traversal;
+        final Map<String, String> aliases = (Map<String, String>) msg.optionalArgs(Tokens.ARGS_ALIASES).get();
+        final GraphManager graphManager = gremlinContext.getGraphManager();
+        final String traversalSourceName = aliases.entrySet().iterator().next().getValue();
+        final TraversalSource g = graphManager.getTraversalSource(traversalSourceName);
+
+        // handle bytecode based graph operations like commit/rollback commands
+        if (BytecodeHelper.isGraphOperation(bytecode)) {
+            handleGraphOperation(gremlinContext, bytecode, g.getGraph());
+            return Optional.empty();
+        } else {
+
+            final Optional<String> lambdaLanguage = BytecodeHelper.getLambdaLanguage(bytecode);
+            if (!lambdaLanguage.isPresent())
+                traversal = JavaTranslator.of(g).translate(bytecode);
+            else {
+                final SimpleBindings bindings = new SimpleBindings();
+                bindings.put(traversalSourceName, g);
+                traversal = gremlinContext.getGremlinExecutor().getScriptEngineManager().getEngineByName(lambdaLanguage.get()).eval(bytecode, bindings, traversalSourceName);
+            }
+
+            // compile the traversal - without it getEndStep() has nothing in it
+            traversal.applyStrategies();
+
+            return Optional.of(new TraverserIterator(traversal));
+        }
+    }
+
+    protected Bindings getWorkerBindings() throws RexsterException {
+        return new SimpleBindings(graphManager.getAsBindings());
+    }
+
+    protected Bindings mergeBindingsFromRequest(final Context gremlinContext, final Bindings bindings) throws RexsterException {
+        // alias any global bindings to a different variable.
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+        if (msg.getArgs().containsKey(Tokens.ARGS_ALIASES)) {
+            final Map<String, String> aliases = (Map<String, String>) msg.getArgs().get(Tokens.ARGS_ALIASES);
+            for (Map.Entry<String,String> aliasKv : aliases.entrySet()) {
+                boolean found = false;
+
+                // first check if the alias refers to a Graph instance
+                final Graph graph = gremlinContext.getGraphManager().getGraph(aliasKv.getValue());
+                if (null != graph) {
+                    bindings.put(aliasKv.getKey(), graph);
+                    found = true;
+                }
+
+                // if the alias wasn't found as a Graph then perhaps it is a TraversalSource - it needs to be
+                // something
+                if (!found) {
+                    final TraversalSource ts = gremlinContext.getGraphManager().getTraversalSource(aliasKv.getValue());
+                    if (null != ts) {
+                        bindings.put(aliasKv.getKey(), ts);
+                        found = true;
+                    }
+                }
+
+                // this validation is important to calls to GraphManager.commit() and rollback() as they both
+                // expect that the aliases supplied are valid
+                if (!found) {
+                    final String error = String.format("Could not alias [%s] to [%s] as [%s] not in the Graph or TraversalSource global bindings",
+                            aliasKv.getKey(), aliasKv.getValue(), aliasKv.getValue());
+                    throw new RexsterException(error, ResponseMessage.build(msg)
+                            .code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(error).create());
+                }
+            }
+        } else {
+            // there's no bindings so determine if that's ok with Gremlin Server
+            if (gremlinContext.getSettings().strictTransactionManagement) {
+                final String error = "Gremlin Server is configured with strictTransactionManagement as 'true' - the 'aliases' arguments must be provided";
+                throw new RexsterException(error, ResponseMessage.build(msg)
+                        .code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(error).create());
+            }
+        }
+
+        // add any bindings to override any other supplied
+        Optional.ofNullable((Map<String, Object>) msg.getArgs().get(Tokens.ARGS_BINDINGS)).ifPresent(bindings::putAll);
+        return bindings;
+    }
+
+    /**
+     * Provides a generic way of iterating a result set back to the client.
+     *
+     * @param gremlinContext The Gremlin Server {@link Context} object containing settings, request message, etc.
+     * @param itty The result to iterator
+     */
+    protected void handleIterator(final Context gremlinContext, final Iterator<?> itty) throws InterruptedException {
+        final ChannelHandlerContext nettyContext = gremlinContext.getChannelHandlerContext();
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+        final Settings settings = gremlinContext.getSettings();
+        boolean warnOnce = false;
+
+        // sessionless requests are always transaction managed, but in-session requests are configurable.
+        final boolean managedTransactionsForRequest = transactionManaged ?
+                true : (Boolean) msg.getArgs().getOrDefault(Tokens.ARGS_MANAGE_TRANSACTION, false);
+
+        // we have an empty iterator - happens on stuff like: g.V().iterate()
+        if (!itty.hasNext()) {
+            final Map<String, Object> attributes = generateStatusAttributes(gremlinContext,ResponseStatusCode.NO_CONTENT, itty);
+            // as there is nothing left to iterate if we are transaction managed then we should execute a
+            // commit here before we send back a NO_CONTENT which implies success
+            if (managedTransactionsForRequest)
+                closeTransaction(gremlinContext, Transaction.Status.COMMIT);
+
+            gremlinContext.writeAndFlush(ResponseMessage.build(msg)
+                    .code(ResponseStatusCode.NO_CONTENT)
+                    .statusAttributes(attributes)
+                    .create());
+            return;
+        }
+
+        // the batch size can be overridden by the request
+        final int resultIterationBatchSize = (Integer) msg.optionalArgs(Tokens.ARGS_BATCH_SIZE)
+                .orElse(settings.resultIterationBatchSize);
+        List<Object> aggregate = new ArrayList<>(resultIterationBatchSize);
+
+        // use an external control to manage the loop as opposed to just checking hasNext() in the while.  this
+        // prevent situations where auto transactions create a new transaction after calls to commit() withing
+        // the loop on calls to hasNext().
+        boolean hasMore = itty.hasNext();
+
+        while (hasMore) {
+            if (Thread.interrupted()) throw new InterruptedException();
+
+            // check if an implementation needs to force flush the aggregated results before the iteration batch
+            // size is reached.
+            // todo: what implementation does this?! can we kill it going forward - seems always false
+            // final boolean forceFlush = isForceFlushed(nettyContext, msg, itty);
+            final boolean forceFlush = false;
+
+            // have to check the aggregate size because it is possible that the channel is not writeable (below)
+            // so iterating next() if the message is not written and flushed would bump the aggregate size beyond
+            // the expected resultIterationBatchSize.  Total serialization time for the response remains in
+            // effect so if the client is "slow" it may simply timeout.
+            //
+            // there is a need to check hasNext() on the iterator because if the channel is not writeable the
+            // previous pass through the while loop will have next()'d the iterator and if it is "done" then a
+            // NoSuchElementException will raise its head. also need a check to ensure that this iteration doesn't
+            // require a forced flush which can be forced by sub-classes.
+            //
+            // this could be placed inside the isWriteable() portion of the if-then below but it seems better to
+            // allow iteration to continue into a batch if that is possible rather than just doing nothing at all
+            // while waiting for the client to catch up
+            if (aggregate.size() < resultIterationBatchSize && itty.hasNext() && !forceFlush) aggregate.add(itty.next());
+
+            // Don't keep executor busy if client has already given up; there is no way to catch up if the channel is
+            // not active, and hence we should break the loop.
+            if (!nettyContext.channel().isActive()) {
+                if (managedTransactionsForRequest) {
+                    closeTransaction(gremlinContext, Transaction.Status.ROLLBACK);
+                }
+                break;
+            }
+
+            // send back a page of results if batch size is met or if it's the end of the results being iterated.
+            // also check writeability of the channel to prevent OOME for slow clients.
+            //
+            // clients might decide to close the Netty channel to the server with a CloseWebsocketFrame after errors
+            // like CorruptedFrameException. On the server, although the channel gets closed, there might be some
+            // executor threads waiting for watermark to clear which will not clear in these cases since client has
+            // already given up on these requests. This leads to these executors waiting for the client to consume
+            // results till the timeout. checking for isActive() should help prevent that.
+            if (nettyContext.channel().isActive() && nettyContext.channel().isWritable()) {
+                if (forceFlush || aggregate.size() == resultIterationBatchSize || !itty.hasNext()) {
+                    final ResponseStatusCode code = itty.hasNext() ? ResponseStatusCode.PARTIAL_CONTENT : ResponseStatusCode.SUCCESS;
+                    Frame frame = null;
+                    try {
+                        frame = makeFrame(gremlinContext, aggregate, code, itty);
+                    } catch (Exception ex) {
+                        // a frame may use a Bytebuf which is a countable release - if it does not get written
+                        // downstream it needs to be released here
+                        if (frame != null) frame.tryRelease();
+
+                        // exception is handled in makeFrame() - serialization error gets written back to driver
+                        // at that point
+                        if (managedTransactionsForRequest)
+                            closeTransaction(gremlinContext, Transaction.Status.ROLLBACK);
+                        break;
+                    }
+
+                    // track whether there is anything left in the iterator because it needs to be accessed after
+                    // the transaction could be closed - in that case a call to hasNext() could open a new transaction
+                    // unintentionally
+                    final boolean moreInIterator = itty.hasNext();
+
+                    try {
+                        // only need to reset the aggregation list if there's more stuff to write
+                        if (moreInIterator)
+                            aggregate = new ArrayList<>(resultIterationBatchSize);
+                        else {
+                            // iteration and serialization are both complete which means this finished successfully. note that
+                            // errors internal to script eval or timeout will rollback given GremlinServer's global configurations.
+                            // local errors will get rolledback below because the exceptions aren't thrown in those cases to be
+                            // caught by the GremlinExecutor for global rollback logic. this only needs to be committed if
+                            // there are no more items to iterate and serialization is complete
+                            if (managedTransactionsForRequest)
+                                closeTransaction(gremlinContext, Transaction.Status.COMMIT);
+
+                            // exit the result iteration loop as there are no more results left.  using this external control
+                            // because of the above commit.  some graphs may open a new transaction on the call to
+                            // hasNext()
+                            hasMore = false;
+                        }
+                    } catch (Exception ex) {
+                        // a frame may use a Bytebuf which is a countable release - if it does not get written
+                        // downstream it needs to be released here
+                        if (frame != null) frame.tryRelease();
+                        throw ex;
+                    }
+
+                    if (!moreInIterator) iterateComplete(gremlinContext, itty);
+
+                    // the flush is called after the commit has potentially occurred.  in this way, if a commit was
+                    // required then it will be 100% complete before the client receives it. the "frame" at this point
+                    // should have completely detached objects from the transaction (i.e. serialization has occurred)
+                    // so a new one should not be opened on the flush down the netty pipeline
+                    gremlinContext.writeAndFlush(code, frame);
+                }
+            } else {
+                // don't keep triggering this warning over and over again for the same request
+                if (!warnOnce) {
+                    logger.warn("Pausing response writing as writeBufferHighWaterMark exceeded on {} - writing will continue once client has caught up", msg);
+                    warnOnce = true;
+                }
+
+                // since the client is lagging we can hold here for a period of time for the client to catch up.
+                // this isn't blocking the IO thread - just a worker.
+                TimeUnit.MILLISECONDS.sleep(10);
+            }
+        }
+    }
+
+    /**
+     * If {@link Bytecode} is detected to contain a "graph operation" then it gets processed by this method.
+     */
+    protected void handleGraphOperation(final Context gremlinContext, final Bytecode bytecode, final Graph graph) throws Exception {
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+        if (graph.features().graph().supportsTransactions()) {
+            if (bytecode.equals(Bytecode.TX_COMMIT) || bytecode.equals(Bytecode.TX_ROLLBACK)) {
+                final boolean commit = bytecode.equals(Bytecode.TX_COMMIT);
+                closeTransaction(gremlinContext, commit ? Transaction.Status.COMMIT : Transaction.Status.ROLLBACK);
+
+                // write back a no-op for success
+                final Map<String, Object> attributes = generateStatusAttributes(gremlinContext,
+                        ResponseStatusCode.NO_CONTENT, Collections.emptyIterator());
+                gremlinContext.writeAndFlush(ResponseMessage.build(msg)
+                            .code(ResponseStatusCode.NO_CONTENT)
+                            .statusAttributes(attributes)
+                            .create());
+            } else {
+                throw new IllegalStateException(String.format(
+                        "Bytecode in request is not a recognized graph operation: %s", bytecode.toString()));
+            }
+        }
+    }
+
+    /**
+     * Called when iteration within {@link #handleIterator(Context, Iterator)} is on its final pass and the final
+     * frame is about to be sent back to the client. This method only gets called on successful iteration of the
+     * entire result.
+     */
+    protected void iterateComplete(final Context gremlinContext, final Iterator<?> itty) {
+        // do nothing by default
+    }
+
+    /**
+     * Generates response status meta-data to put on a {@link ResponseMessage}.
+     *
+     * @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in
+     *             this method
+     */
+    protected Map<String, Object> generateStatusAttributes(final Context gremlinContext,
+                                                           final ResponseStatusCode code, final Iterator<?> itty) {
+        // only return server metadata on the last message
+        if (itty.hasNext()) return Collections.emptyMap();
+
+        final Map<String, Object> metaData = new HashMap<>();
+        metaData.put(Tokens.ARGS_HOST, gremlinContext.getChannelHandlerContext().channel().remoteAddress().toString());
+
+        return metaData;
+    }
+
+    /**
+     * Generates response result meta-data to put on a {@link ResponseMessage}.
+     *
+     * @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in
+     *             this method
+     */
+    protected Map<String, Object> generateResponseMetaData(final Context gremlinContext,
+                                                           final ResponseStatusCode code, final Iterator<?> itty) {
+        return Collections.emptyMap();
+    }
+
+    protected Frame makeFrame(final Context gremlinContext, final List<Object> aggregate,
+                              final ResponseStatusCode code, final Iterator<?> itty) throws Exception {
+        final RequestMessage msg = gremlinContext.getRequestMessage();
+        final ChannelHandlerContext nettyContext = gremlinContext.getChannelHandlerContext();
+        final MessageSerializer serializer = nettyContext.channel().attr(StateKey.SERIALIZER).get();
+        final boolean useBinary = nettyContext.channel().attr(StateKey.USE_BINARY).get();
+
+        final Map<String, Object> responseMetaData = generateResponseMetaData(gremlinContext, code, itty);
+        final Map<String, Object> statusAttributes = generateStatusAttributes(gremlinContext, code, itty);
+        try {
+            if (useBinary) {
+                return new Frame(serializer.serializeResponseAsBinary(ResponseMessage.build(msg)
+                        .code(code)
+                        .statusAttributes(statusAttributes)
+                        .responseMetaData(responseMetaData)
+                        .result(aggregate).create(), nettyContext.alloc()));
+            } else {
+                // the expectation is that the GremlinTextRequestDecoder will have placed a MessageTextSerializer
+                // instance on the channel.
+                final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
+                return new Frame(textSerializer.serializeResponseAsString(ResponseMessage.build(msg)
+                        .code(code)
+                        .statusAttributes(statusAttributes)
+                        .responseMetaData(responseMetaData)
+                        .result(aggregate).create()));
+            }
+        } catch (Exception ex) {
+            logger.warn("The result [{}] in the request {} could not be serialized and returned.", aggregate, msg.getRequestId(), ex);
+            final String errorMessage = String.format("Error during serialization: %s", ExceptionHelper.getMessageFromExceptionOrCause(ex));
+            final ResponseMessage error = ResponseMessage.build(msg.getRequestId())
+                    .statusMessage(errorMessage)
+                    .statusAttributeException(ex)
+                    .code(ResponseStatusCode.SERVER_ERROR_SERIALIZATION).create();
+            gremlinContext.writeAndFlush(error);
+            throw ex;
+        }
+    }
+
+    protected void startTransaction(final Context gremlinContext) {
+        // do nothing - assuming auto-start transactions
+    }
+
+    private void closeTransaction(final Transaction.Status status) {
+        closeTransaction(null, status);
+    }
+
+    /**
+     * Closes a transaction with commit or rollback. Strict transaction management settings are observed when
+     * configured as such in {@link Settings#strictTransactionManagement} and when aliases are present on the
+     * request in the current {@link Context}. If the supplied {@link Context} is {@code null} then "strict" is
+     * bypassed so this form must be called with care. Bypassing is often useful to ensure that all transactions
+     * are cleaned up when multiple graphs are referenced. Prefer calling {@link #closeTransaction(Transaction.Status)}
+     * in this case instead.
+     */
+    protected void closeTransaction(final Context gremlinContext, final Transaction.Status status) {
+        if (status != Transaction.Status.COMMIT && status != Transaction.Status.ROLLBACK)
+            throw new IllegalStateException(String.format("Transaction.Status not supported: %s", status));
+
+        final boolean commit = status == Transaction.Status.COMMIT;
+        final boolean strict = gremlinContext != null && gremlinContext.getSettings().strictTransactionManagement;
+
+        if (strict) {
+            if (commit)
+                graphManager.commit(new HashSet<>(aliasesUsedByRexster));
+            else
+                graphManager.rollback(new HashSet<>(aliasesUsedByRexster));
+        } else {
+            if (commit)
+                graphManager.commitAll();
+            else
+                graphManager.rollbackAll();
+        }
+    }
+
+    private void processAuditLog(final Settings settings, final ChannelHandlerContext ctx, final Object gremlinToExecute) {
+        if (settings.enableAuditLog) {
+            AuthenticatedUser user = ctx.channel().attr(StateKey.AUTHENTICATED_USER).get();
+            if (null == user) {    // This is expected when using the AllowAllAuthenticator
+                user = AuthenticatedUser.ANONYMOUS_USER;
+            }
+            String address = ctx.channel().remoteAddress().toString();
+            if (address.startsWith("/") && address.length() > 1) address = address.substring(1);
+            auditLogger.info("User {} with address {} requested: {}", user.getName(), address, gremlinToExecute);
+        }
+
+        if (settings.authentication.enableAuditLog) {
+            String address = ctx.channel().remoteAddress().toString();
+            if (address.startsWith("/") && address.length() > 1) address = address.substring(1);
+            auditLogger.info("User with address {} requested: {}", address, gremlinToExecute);
+        }
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthenticationHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthenticationHandler.java
index a282874..a050ab0 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthenticationHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthenticationHandler.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.server.Settings;
 import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
 import org.apache.tinkerpop.gremlin.server.auth.AuthenticationException;
 import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
+import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,8 +55,16 @@ public class HttpBasicAuthenticationHandler extends AbstractAuthenticationHandle
 
     private final Base64.Decoder decoder = Base64.getUrlDecoder();
 
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #HttpBasicAuthenticationHandler(Authenticator, Authorizer, Settings)}.
+     */
+    @Deprecated
     public HttpBasicAuthenticationHandler(final Authenticator authenticator, final Settings settings) {
-        super(authenticator);
+        this(authenticator, null, settings);
+    }
+
+    public HttpBasicAuthenticationHandler(final Authenticator authenticator, final Authorizer authorizer, final Settings settings) {
+        super(authenticator, authorizer);
         this.settings = settings;
     }
 
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java
new file mode 100644
index 0000000..f1ac482
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
+import org.apache.tinkerpop.gremlin.groovy.jsr223.GroovyCompilerGremlinPlugin;
+import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptEngine;
+import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptEngineManager;
+import org.apache.tinkerpop.gremlin.server.Context;
+import org.apache.tinkerpop.gremlin.server.Settings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.script.Bindings;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor.CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED;
+
+/**
+ * A {@link Rexster} implementation that queues tasks given to it and executes them in a serial fashion within the
+ * same thread which thus allows multiple tasks to be executed in the same transaction.
+ */
+public class MultiRexster extends AbstractRexster {
+    private static final Logger logger = LoggerFactory.getLogger(MultiRexster.class);
+    protected final BlockingQueue<Context> queue = new LinkedBlockingQueue<>();
+    private ScheduledFuture<?> requestCancelFuture;
+    private Bindings bindings;
+    private final AtomicBoolean ending = new AtomicBoolean(false);
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final GremlinScriptEngineManager scriptEngineManager;
+
+    MultiRexster(final Context gremlinContext, final String sessionId,
+                 final ConcurrentMap<String, Rexster> sessions) {
+        super(gremlinContext, sessionId, false, sessions);
+
+        // using a global function cache is cheaper than creating a new on per session especially if you have to
+        // create a lot of sessions. it will generate a ton of throw-away objects. mostly keeping the option open
+        // to not use it to preserve the ability to use the old functionality if wanted or if there is some specific
+        // use case with sessions that needs it. if we wanted this could eventually become a per-request option
+        // so that the client could control it as necessary and get scriptengine isolation if they need it.
+        if (gremlinContext.getSettings().useCommonEngineForSessions)
+            scriptEngineManager = gremlinContext.getGremlinExecutor().getScriptEngineManager();
+        else
+            scriptEngineManager = initializeGremlinExecutor(gremlinContext).getScriptEngineManager();
+
+        scheduledExecutorService = gremlinContext.getScheduledExecutorService();
+        addTask(gremlinContext);
+    }
+
+    @Override
+    public GremlinScriptEngine getScriptEngine(final Context gremlinContext, final String language) {
+        return scriptEngineManager.getEngineByName(language);
+    }
+
+    @Override
+    public boolean acceptingTasks() {
+        return !ending.get();
+    }
+
+    @Override
+    public void addTask(final Context gremlinContext) {
+        // todo: explicitly reject request???
+        if (acceptingTasks())
+            queue.offer(gremlinContext);
+    }
+
+    @Override
+    public void run() {
+        // there must be one item in the queue at least since addTask() gets called before the worker
+        // is ever started
+        Context gremlinContext = queue.poll();
+        if (null == gremlinContext)
+            throw new IllegalStateException(String.format("Worker has no initial context for session: %s", getSessionId()));
+
+        try {
+            startTransaction(gremlinContext);
+            try {
+                while (true) {
+                    // schedule timeout for the current request from the queue
+                    final long seto = gremlinContext.getRequestTimeout();
+                    requestCancelFuture = scheduledExecutorService.schedule(
+                            () -> this.triggerTimeout(seto, false),
+                            seto, TimeUnit.MILLISECONDS);
+
+                    process(gremlinContext);
+
+                    // work is done within the timeout period so cancel it
+                    cancelRequestTimeout();
+
+                    gremlinContext = queue.take();
+                }
+            } catch (Exception ex) {
+                // stop accepting requests on this worker since it is heading to close()
+                ending.set(true);
+
+                // the current context gets its exception handled...
+                handleException(gremlinContext, ex);
+            }
+        } catch (RexsterException rexex) {
+            // remaining work items in the queue are ignored since this worker is closing. must send
+            // back some sort of response to satisfy the client. writeAndFlush code is different than
+            // the ResponseMessage as we don't want the message to be "final" for the Context. that
+            // status must be reserved for the message that caused the error
+            for (Context gctx : queue) {
+                gctx.writeAndFlush(ResponseStatusCode.PARTIAL_CONTENT, ResponseMessage.build(gctx.getRequestMessage())
+                        .code(ResponseStatusCode.SERVER_ERROR)
+                        .statusMessage(String.format(
+                                "An earlier request [%s] failed prior to this one having a chance to execute",
+                                gremlinContext.getRequestMessage().getRequestId())).create());
+            }
+
+            logger.warn(rexex.getMessage(), rexex);
+            gremlinContext.writeAndFlush(rexex.getResponseMessage());
+        } finally {
+            close();
+        }
+    }
+
+    @Override
+    public synchronized void close() {
+        ending.set(true);
+        cancelRequestTimeout();
+        super.close();
+        logger.info("Session {} closed", getSessionId());
+    }
+
+    private void cancelRequestTimeout() {
+        if (requestCancelFuture != null && !requestCancelFuture.isDone())
+            requestCancelFuture.cancel(true);
+    }
+
+    @Override
+    protected Bindings getWorkerBindings() throws RexsterException {
+        if (null == bindings)
+            bindings = super.getWorkerBindings();
+        return this.bindings;
+    }
+
+    protected GremlinExecutor initializeGremlinExecutor(final Context gremlinContext) {
+        final Settings settings = gremlinContext.getSettings();
+        final ExecutorService executor = gremlinContext.getGremlinExecutor().getExecutorService();
+        final boolean useGlobalFunctionCache = settings.useGlobalFunctionCacheForSessions;
+
+        // these initial settings don't matter so much as we don't really execute things through the
+        // GremlinExecutor directly. Just doing all this setup to make GremlinExecutor do the work of
+        // rigging up the GremlinScriptEngineManager.
+        final GremlinExecutor.Builder gremlinExecutorBuilder = GremlinExecutor.build()
+                .evaluationTimeout(settings.getEvaluationTimeout())
+                .executorService(executor)
+                .globalBindings(graphManager.getAsBindings())
+                .scheduledExecutorService(scheduledExecutorService);
+
+        settings.scriptEngines.forEach((k, v) -> {
+            // use plugins if they are present
+            if (!v.plugins.isEmpty()) {
+                // make sure that server related classes are available at init. the LifeCycleHook stuff will be
+                // added explicitly via configuration using GremlinServerGremlinModule in the yaml. need to override
+                // scriptengine settings with SessionOpProcessor specific ones as the processing for sessions is
+                // different and a global setting may not make sense for a session
+                if (v.plugins.containsKey(GroovyCompilerGremlinPlugin.class.getName())) {
+                    v.plugins.get(GroovyCompilerGremlinPlugin.class.getName()).put(CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED, useGlobalFunctionCache);
+                } else {
+                    final Map<String,Object> pluginConf = new HashMap<>();
+                    pluginConf.put(CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED, useGlobalFunctionCache);
+                    v.plugins.put(GroovyCompilerGremlinPlugin.class.getName(), pluginConf);
+                }
+
+                gremlinExecutorBuilder.addPlugins(k, v.plugins);
+            }
+        });
+
+        return gremlinExecutorBuilder.create();
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java
new file mode 100644
index 0000000..70869dd
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import io.netty.channel.Channel;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.server.Context;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * The {@code Rexster} interface is essentially a form of "worker", named for Gremlin's trusty canine robot friend
+ * who represents the "server" aspect of TinkerPop. In the most generic sense, {@code Rexster} implementations take
+ * tasks, a Gremlin Server {@link Context} and process them, which typically means "execute some Gremlin".
+ * Implementations have a fair amount of flexibility in terms of how they go about doing this, but in most cases,
+ * the {@link SingleRexster} and the {@link MultiRexster} should handle most cases quite well and are extensible for
+ * providers.
+ */
+public interface Rexster extends Runnable {
+    /**
+     * Adds a task for Rexster to complete.
+     */
+    void addTask(final Context gremlinContext);
+
+    /**
+     * Sets a reference to the job that will cancel this Rexster if it exceeds its timeout period.
+     */
+    void setSessionCancelFuture(final ScheduledFuture<?> f);
+
+    /**
+     * Sets a reference to the job itself that is running this Rexster.
+     */
+    void setSessionFuture(final Future<?> f);
+
+    /**
+     * Provides a general way to tell Rexster that it has exceeded some timeout condition.
+     */
+    void triggerTimeout(final long timeout, boolean causedBySession);
+
+    /**
+     * Determines if the supplied {@code Channel} object is the same as the one bound to the {@code Session}.
+     */
+    boolean isBoundTo(final Channel channel);
+
+    /**
+     * Determins if this Rexster can accept additional tasks or not.
+     */
+    boolean acceptingTasks();
+
+    public class RexsterException extends Exception {
+        private final ResponseMessage responseMessage;
+
+        public RexsterException(final String message, final ResponseMessage responseMessage) {
+            super(message);
+            this.responseMessage = responseMessage;
+        }
+
+        public RexsterException(final String message, final Throwable cause, final ResponseMessage responseMessage) {
+            super(message, cause);
+            this.responseMessage = responseMessage;
+        }
+
+        public ResponseMessage getResponseMessage() {
+            return this.responseMessage;
+        }
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAndHttpBasicAuthenticationHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAndHttpBasicAuthenticationHandler.java
index d58553d..31dabd0 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAndHttpBasicAuthenticationHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAndHttpBasicAuthenticationHandler.java
@@ -21,14 +21,14 @@ package org.apache.tinkerpop.gremlin.server.handler;
 
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.http.HttpMessage;
 import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
 import org.apache.tinkerpop.gremlin.server.Settings;
-import org.apache.tinkerpop.gremlin.server.handler.HttpBasicAuthenticationHandler;
-import org.apache.tinkerpop.gremlin.server.handler.SaslAuthenticationHandler;
-import org.apache.tinkerpop.gremlin.server.handler.WebSocketHandlerUtil;
+import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
 
+import static org.apache.tinkerpop.gremlin.server.AbstractChannelizer.PIPELINE_AUTHORIZER;
 import static org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer.PIPELINE_AUTHENTICATOR;
 
 /**
@@ -39,18 +39,33 @@ public class SaslAndHttpBasicAuthenticationHandler extends SaslAuthenticationHan
 
     private final String HTTP_AUTH = "http-authentication";
 
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #SaslAndHttpBasicAuthenticationHandler(Authenticator, Authorizer, Settings)}.
+     */
+    @Deprecated
     public SaslAndHttpBasicAuthenticationHandler(final Authenticator authenticator, final Settings settings) {
-        super(authenticator, settings);
+        this(authenticator, null, settings);
+    }
+
+    public SaslAndHttpBasicAuthenticationHandler(final Authenticator authenticator, final Authorizer authorizer, final Settings settings) {
+        super(authenticator, authorizer, settings);
     }
 
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object obj) throws Exception {
         if (obj instanceof HttpMessage && !WebSocketHandlerUtil.isWebSocket((HttpMessage)obj)) {
-            ChannelPipeline pipeline = ctx.pipeline();
+            final ChannelPipeline pipeline = ctx.pipeline();
             if (null != pipeline.get(HTTP_AUTH)) {
                 pipeline.remove(HTTP_AUTH);
             }
             pipeline.addAfter(PIPELINE_AUTHENTICATOR, HTTP_AUTH, new HttpBasicAuthenticationHandler(authenticator, this.settings));
+
+            if (authorizer != null) {
+                final ChannelInboundHandlerAdapter authorizationHandler = new HttpBasicAuthorizationHandler(authorizer);
+                pipeline.remove(PIPELINE_AUTHORIZER);
+                pipeline.addAfter(HTTP_AUTH, PIPELINE_AUTHORIZER, authorizationHandler);
+            }
+
             ctx.fireChannelRead(obj);
         } else {
             super.channelRead(ctx, obj);
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
index e8216a6..55da853 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
@@ -40,6 +40,7 @@ import org.apache.tinkerpop.gremlin.server.Settings;
 import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
 import org.apache.tinkerpop.gremlin.server.auth.AuthenticationException;
 import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
+import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
 import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,8 +60,16 @@ public class SaslAuthenticationHandler extends AbstractAuthenticationHandler {
 
     protected final Settings settings;
 
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #SaslAuthenticationHandler(Authenticator, Authorizer, Settings)}.
+     */
+    @Deprecated
     public SaslAuthenticationHandler(final Authenticator authenticator, final Settings settings) {
-        super(authenticator);
+        this(authenticator, null, settings);
+    }
+
+    public SaslAuthenticationHandler(final Authenticator authenticator, final Authorizer authorizer, final Settings settings) {
+        super(authenticator, authorizer);
         this.settings = settings;
     }
 
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SingleRexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SingleRexster.java
new file mode 100644
index 0000000..69a269f
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SingleRexster.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import org.apache.tinkerpop.gremlin.server.Context;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+
+/**
+ * A simple {@link Rexster} implementation that accepts one request, processes it and exits.
+ */
+public class SingleRexster extends AbstractRexster {
+    private static final Logger logger = LoggerFactory.getLogger(SingleRexster.class);
+    protected final Context gremlinContext;
+
+    SingleRexster(final Context gremlinContext, final String sessionId,
+                  final ConcurrentMap<String, Rexster> sessions) {
+        super(gremlinContext, sessionId,true, sessions);
+        this.gremlinContext = gremlinContext;
+    }
+
+    /**
+     * The {@code SingleWorker} can only process one request so the initial construction of it already has the
+     * request in it and no more can be added, therefore this method always return {@code false}.
+     */
+    @Override
+    public boolean acceptingTasks() {
+        return false;
+    }
+
+    @Override
+    public void addTask(final Context gremlinContext) {
+        throw new UnsupportedOperationException("SingleWorker doesn't accept tasks beyond the one provided to the constructor");
+    }
+
+    @Override
+    public void run() {
+        try {
+            startTransaction(gremlinContext);
+            process(gremlinContext);
+        } catch (RexsterException we) {
+            logger.warn(we.getMessage(), we);
+            gremlinContext.writeAndFlush(we.getResponseMessage());
+        } finally {
+            close();
+        }
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java
new file mode 100644
index 0000000..7483062
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
+import org.apache.tinkerpop.gremlin.process.traversal.Operator;
+import org.apache.tinkerpop.gremlin.process.traversal.Order;
+import org.apache.tinkerpop.gremlin.process.traversal.Pop;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.server.Channelizer;
+import org.apache.tinkerpop.gremlin.server.Context;
+import org.apache.tinkerpop.gremlin.server.GraphManager;
+import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.Rexster.RexsterException;
+import org.apache.tinkerpop.gremlin.structure.Column;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Handler for websockets to be used with the {@link UnifiedChannelizer}.
+ */
+@ChannelHandler.Sharable
+public class UnifiedHandler extends SimpleChannelInboundHandler<RequestMessage> {
+    private static final Logger logger = LoggerFactory.getLogger(UnifiedHandler.class);
+
+    protected final Settings settings;
+    protected final GraphManager graphManager;
+    protected final GremlinExecutor gremlinExecutor;
+    protected final ScheduledExecutorService scheduledExecutorService;
+    protected final ExecutorService executorService;
+    protected final Channelizer channelizer;
+
+    protected final ConcurrentMap<String, Rexster> sessions = new ConcurrentHashMap<>();
+
+    /**
+     * This may or may not be the full set of invalid binding keys.  It is dependent on the static imports made to
+     * Gremlin Server.  This should get rid of the worst offenders though and provide a good message back to the
+     * calling client.
+     * <p/>
+     * Use of {@code toUpperCase()} on the accessor values of {@link T} solves an issue where the {@code ScriptEngine}
+     * ignores private scope on {@link T} and imports static fields.
+     */
+    protected static final Set<String> INVALID_BINDINGS_KEYS = new HashSet<>();
+
+    static {
+        INVALID_BINDINGS_KEYS.addAll(Arrays.asList(
+                T.id.name(), T.key.name(),
+                T.label.name(), T.value.name(),
+                T.id.getAccessor(), T.key.getAccessor(),
+                T.label.getAccessor(), T.value.getAccessor(),
+                T.id.getAccessor().toUpperCase(), T.key.getAccessor().toUpperCase(),
+                T.label.getAccessor().toUpperCase(), T.value.getAccessor().toUpperCase()));
+
+        for (Column enumItem : Column.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+
+        for (Order enumItem : Order.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+
+        for (Operator enumItem : Operator.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+
+        for (Scope enumItem : Scope.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+
+        for (Pop enumItem : Pop.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+    }
+
+    public UnifiedHandler(final Settings settings, final GraphManager graphManager,
+                          final GremlinExecutor gremlinExecutor,
+                          final ScheduledExecutorService scheduledExecutorService,
+                          final Channelizer channelizer) {
+        this.settings = settings;
+        this.graphManager = graphManager;
+        this.gremlinExecutor = gremlinExecutor;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.executorService = gremlinExecutor.getExecutorService();
+        this.channelizer = channelizer;
+    }
+
+    @Override
+    protected void channelRead0(final ChannelHandlerContext ctx, final RequestMessage msg) throws Exception {
+        try {
+            try {
+                validateRequest(msg, graphManager);
+            } catch (RexsterException we) {
+                ctx.writeAndFlush(we.getResponseMessage());
+                return;
+            }
+
+            final Optional<String> optSession = msg.optionalArgs(Tokens.ARGS_SESSION);
+            final String sessionId = optSession.orElse(UUID.randomUUID().toString());
+
+            // still using GremlinExecutor here in the Context so that this object doesn't need to immediately
+            // change, but also because GremlinExecutor/ScriptEngine config is all rigged up into the server nicely
+            // right now. when the UnifiedChannelizer is "ready" we can factor out the GremlinExecutor
+            final Context gremlinContext = new Context(msg, ctx, settings, graphManager,
+                    gremlinExecutor, scheduledExecutorService);
+
+            if (sessions.containsKey(sessionId)) {
+                final Rexster rexster = sessions.get(sessionId);
+
+                // check if the session is still accepting requests - if not block further requests
+                if (!rexster.acceptingTasks()) {
+                    final String sessionClosedMessage = String.format(
+                            "Session %s is no longer accepting requests as it has been closed", sessionId);
+                    final ResponseMessage response = ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
+                            .statusMessage(sessionClosedMessage).create();
+                    ctx.writeAndFlush(response);
+                    return;
+                }
+
+                // check if the session is bound to this channel, thus one client per session
+                if (!rexster.isBoundTo(gremlinContext.getChannelHandlerContext().channel())) {
+                    final String sessionClosedMessage = String.format("Session %s is not bound to the connecting client", sessionId);
+                    final ResponseMessage response = ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
+                            .statusMessage(sessionClosedMessage).create();
+                    ctx.writeAndFlush(response);
+                    return;
+                }
+
+                rexster.addTask(gremlinContext);
+            } else {
+                final Rexster rexster = optSession.isPresent() ?
+                        createMulti(gremlinContext, sessionId) : createSingle(gremlinContext, sessionId);
+                final Future<?> sessionFuture = executorService.submit(rexster);
+                rexster.setSessionFuture(sessionFuture);
+                sessions.put(sessionId, rexster);
+
+                // determine the max session life. for multi that's going to be "session life" and for single that
+                // will be the span of the request timeout
+                final long seto = gremlinContext.getRequestTimeout();
+                final long sessionLife = optSession.isPresent() ? settings.sessionLifetimeTimeout : seto;
+
+                // if timeout is enabled when greater than zero
+                if (seto > 0) {
+                    final ScheduledFuture<?> sessionCancelFuture =
+                            scheduledExecutorService.schedule(
+                                    () -> rexster.triggerTimeout(sessionLife, optSession.isPresent()),
+                                    sessionLife, TimeUnit.MILLISECONDS);
+                    rexster.setSessionCancelFuture(sessionCancelFuture);
+                }
+            }
+        } finally {
+            ReferenceCountUtil.release(msg);
+        }
+    }
+
+    protected void validateRequest(final RequestMessage message, final GraphManager graphManager) throws RexsterException {
+        if (!message.optionalArgs(Tokens.ARGS_GREMLIN).isPresent()) {
+            final String msg = String.format("A message with an [%s] op code requires a [%s] argument.", message.getOp(), Tokens.ARGS_GREMLIN);
+            throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+        }
+
+        if (message.optionalArgs(Tokens.ARGS_BINDINGS).isPresent()) {
+            final Map bindings = (Map) message.getArgs().get(Tokens.ARGS_BINDINGS);
+            if (IteratorUtils.anyMatch(bindings.keySet().iterator(), k -> null == k || !(k instanceof String))) {
+                final String msg = String.format("The [%s] message is using one or more invalid binding keys - they must be of type String and cannot be null", Tokens.OPS_EVAL);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+
+            final Set<String> badBindings = IteratorUtils.set(IteratorUtils.<String>filter(bindings.keySet().iterator(), INVALID_BINDINGS_KEYS::contains));
+            if (!badBindings.isEmpty()) {
+                final String msg = String.format("The [%s] message supplies one or more invalid parameters key of [%s] - these are reserved names.", Tokens.OPS_EVAL, badBindings);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+
+            // ignore control bindings that get passed in with the "#jsr223" prefix - those aren't used in compilation
+            if (IteratorUtils.count(IteratorUtils.filter(bindings.keySet().iterator(), k -> !k.toString().startsWith("#jsr223"))) > settings.maxParameters) {
+                final String msg = String.format("The [%s] message contains %s bindings which is more than is allowed by the server %s configuration",
+                        Tokens.OPS_EVAL, bindings.size(), settings.maxParameters);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+        }
+
+        // bytecode must have an alias defined
+        if (message.getOp().equals(Tokens.OPS_BYTECODE)) {
+            final Optional<Map<String, String>> aliases = message.optionalArgs(Tokens.ARGS_ALIASES);
+            if (!aliases.isPresent()) {
+                final String msg = String.format("A message with [%s] op code requires a [%s] argument.", Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+
+            if (aliases.get().size() != 1 || !aliases.get().containsKey(Tokens.VAL_TRAVERSAL_SOURCE_ALIAS)) {
+                final String msg = String.format("A message with [%s] op code requires the [%s] argument to be a Map containing one alias assignment named '%s'.",
+                        Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES, Tokens.VAL_TRAVERSAL_SOURCE_ALIAS);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+
+            final String traversalSourceBindingForAlias = aliases.get().values().iterator().next();
+            if (!graphManager.getTraversalSourceNames().contains(traversalSourceBindingForAlias)) {
+                final String msg = String.format("The traversal source [%s] for alias [%s] is not configured on the server.", traversalSourceBindingForAlias, Tokens.VAL_TRAVERSAL_SOURCE_ALIAS);
+                throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+            }
+        }
+    }
+
+    @Override
+    public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
+        // only need to handle this event if the idle monitor is on
+        if (!channelizer.supportsIdleMonitor()) return;
+
+        if (evt instanceof IdleStateEvent) {
+            final IdleStateEvent e = (IdleStateEvent) evt;
+
+            // if no requests (reader) then close, if no writes from server to client then ping. clients should
+            // periodically ping the server, but coming from this direction allows the server to kill channels that
+            // have dead clients on the other end
+            if (e.state() == IdleState.READER_IDLE) {
+                logger.info("Closing channel - client is disconnected after idle period of " + settings.idleConnectionTimeout + " " + ctx.channel().id().asShortText());
+                ctx.close();
+            } else if (e.state() == IdleState.WRITER_IDLE && settings.keepAliveInterval > 0) {
+                logger.info("Checking channel - sending ping to client after idle period of " + settings.keepAliveInterval + " " + ctx.channel().id().asShortText());
+                ctx.writeAndFlush(channelizer.createIdleDetectionMessage());
+            }
+        }
+    }
+
+    protected Rexster createSingle(final Context gremlinContext, final String sessionId) {
+        return new SingleRexster(gremlinContext, sessionId, sessions);
+    }
+
+    protected Rexster createMulti(final Context gremlinContext, final String sessionId) {
+        return new MultiRexster(gremlinContext, sessionId, sessions);
+    }
+
+    public boolean isActiveSession(final String sessionId) {
+        return sessions.containsKey(sessionId);
+    }
+
+    public int getActiveSessionCount() {
+        return sessions.size();
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java
index e4f79b4..a93e1c4 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java
@@ -28,9 +28,9 @@ import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
 import org.apache.tinkerpop.gremlin.server.channel.WsAndHttpChannelizer;
 import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
 
+import static org.apache.tinkerpop.gremlin.server.AbstractChannelizer.PIPELINE_HTTP_AGGREGATOR;
 import static org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer.PIPELINE_AUTHENTICATOR;
 import static org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer.PIPELINE_REQUEST_HANDLER;
-import static org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer.PIPELINE_HTTP_RESPONSE_ENCODER;
 
 /**
  * A ChannelInboundHandlerAdapter for use with {@link WsAndHttpChannelizer} that toggles between WebSockets
@@ -66,11 +66,11 @@ public class WsAndHttpChannelizerHandler extends ChannelInboundHandlerAdapter {
                 pipeline.remove(PIPELINE_REQUEST_HANDLER);
                 final ChannelHandler authenticator = pipeline.get(PIPELINE_AUTHENTICATOR);
                 pipeline.remove(PIPELINE_AUTHENTICATOR);
-                pipeline.addAfter(PIPELINE_HTTP_RESPONSE_ENCODER, PIPELINE_AUTHENTICATOR, authenticator);
+                pipeline.addAfter(PIPELINE_HTTP_AGGREGATOR, PIPELINE_AUTHENTICATOR, authenticator);
                 pipeline.addAfter(PIPELINE_AUTHENTICATOR, PIPELINE_REQUEST_HANDLER, this.httpGremlinEndpointHandler);
             } else {
                 pipeline.remove(PIPELINE_REQUEST_HANDLER);
-                pipeline.addAfter(PIPELINE_HTTP_RESPONSE_ENCODER, PIPELINE_REQUEST_HANDLER, this.httpGremlinEndpointHandler);
+                pipeline.addAfter(PIPELINE_HTTP_AGGREGATOR, PIPELINE_REQUEST_HANDLER, this.httpGremlinEndpointHandler);
             }
         }
         ctx.fireChannelRead(obj);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
index 55f89e6..deb1734 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
@@ -18,7 +18,14 @@
  */
 package org.apache.tinkerpop.gremlin.server;
 
+import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizerIntegrateTest;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizerIntegrateTest;
+import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
+import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizerIntegrateTest;
 import org.apache.tinkerpop.gremlin.server.op.OpLoader;
+import org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor;
+import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -96,16 +103,28 @@ public abstract class AbstractGremlinServerIntegrationTest {
         startServer(settings);
     }
 
+    private boolean shouldTestUnified() {
+        // ignore all tests in the UnifiedChannelizerIntegrateTest package as they are already rigged to test
+        // over the various channelizer implementations
+        return Boolean.parseBoolean(System.getProperty("testUnified", "false")) &&
+                !this.getClass().getPackage().equals(UnifiedChannelizerIntegrateTest.class.getPackage());
+    }
+
     public void startServer(final Settings settings) throws Exception {
         if (null == settings) {
             startServer();
         } else {
-            final Settings overridenSettings = overrideSettings(settings);
-            ServerTestHelper.rewritePathsInGremlinServerSettings(overridenSettings);
+            final Settings oSettings = overrideSettings(settings);
+
+            if (shouldTestUnified()) {
+                oSettings.channelizer = UnifiedChannelizer.class.getName();
+            }
+
+            ServerTestHelper.rewritePathsInGremlinServerSettings(oSettings);
             if (GREMLIN_SERVER_EPOLL) {
-                overridenSettings.useEpollEventLoop = true;
+                oSettings.useEpollEventLoop = true;
             }
-            this.server = new GremlinServer(overridenSettings);
+            this.server = new GremlinServer(oSettings);
             server.start().join();
         }
     }
@@ -119,6 +138,10 @@ public abstract class AbstractGremlinServerIntegrationTest {
             overriddenSettings.useEpollEventLoop = true;
         }
 
+        if (shouldTestUnified()) {
+            overriddenSettings.channelizer = UnifiedChannelizer.class.getName();
+        }
+
         this.server = new GremlinServer(overriddenSettings);
 
         server.start().join();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
index f8956e2..e685932 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
@@ -47,7 +47,8 @@ public class ContextTest {
 
     private final ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class);
     private final RequestMessage request = RequestMessage.build("test").create();
-    private final Context context = new Context(request, ctx, null, null, null, null);
+    private final Settings settings = new Settings();
+    private final Context context = new Context(request, ctx, settings, null, null, null);
     private final Log4jRecordingAppender recordingAppender = new Log4jRecordingAppender();
 
     private Level originalLogLevel;
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 3c1d308..c11cad8 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -42,6 +42,7 @@ import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin;
 import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
 import org.apache.tinkerpop.gremlin.server.handler.OpExecutorHandler;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
@@ -99,6 +100,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.core.StringStartsWith.startsWith;
+import static org.junit.Assume.assumeThat;
 import static org.mockito.Mockito.verify;
 
 /**
@@ -1645,16 +1647,60 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
                 final CompletableFuture<List<Result>> futureThird = third.get().all();
                 final CompletableFuture<List<Result>> futureFourth = fourth.get().all();
 
-                assertFutureTimeout(futureFirst);
-                assertFutureTimeout(futureSecond);
-                assertFutureTimeout(futureThird);
-                assertFutureTimeout(futureFourth);
+                // there is slightly different assertion logic with UnifiedChannelizer given differences in session
+                // behavior where UnfiedChannelizer sessions won't continue processing in the face of a timeout and
+                // a new session will need to be created
+                if (server.getServerGremlinExecutor().getSettings().channelizer.equals(UnifiedChannelizer.class.getName())) {
+                    // first timesout and the rest get SERVER_ERROR
+                    try {
+                        futureFirst.get();
+                        fail("Should have timed out");
+                    } catch (Exception ex) {
+                        final Throwable root = ExceptionUtils.getRootCause(ex);
+                        assertThat(root, instanceOf(ResponseException.class));
+                        assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) root).getResponseStatusCode());
+                        assertThat(root.getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("250 ms")));
+                    }
+
+                    assertFutureTimeoutUnderUnified(futureSecond);
+                    assertFutureTimeoutUnderUnified(futureThird);
+                    assertFutureTimeoutUnderUnified(futureFourth);
+                } else {
+                    assertFutureTimeout(futureFirst);
+                    assertFutureTimeout(futureSecond);
+                    assertFutureTimeout(futureThird);
+                    assertFutureTimeout(futureFourth);
+                }
             }
         } finally {
             cluster.close();
         }
     }
 
+    private void assertFutureTimeoutUnderUnified(final CompletableFuture<List<Result>> f) {
+        try {
+            f.get();
+            fail("Should have timed out");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(ResponseException.class));
+            assertEquals(ResponseStatusCode.SERVER_ERROR, ((ResponseException) root).getResponseStatusCode());
+            assertThat(root.getMessage(), allOf(startsWith("An earlier request"), endsWith("failed prior to this one having a chance to execute")));
+        }
+    }
+
+    private void assertFutureTimeout(final CompletableFuture<List<Result>> f) {
+        try {
+            f.get();
+            fail("Should have timed out");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(ResponseException.class));
+            assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) root).getResponseStatusCode());
+            assertThat(root.getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("250 ms")));
+        }
+    }
+
     @Test
     public void shouldCloseAllClientsOnCloseOfCluster() throws Exception {
         final Cluster cluster = TestClientFactory.open();
@@ -1780,20 +1826,6 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
 
     }
 
-    private void assertFutureTimeout(final CompletableFuture<List<Result>> futureFirst) {
-        try
-        {
-            futureFirst.get();
-            fail("Should have timed out");
-        }
-        catch (Exception ex)
-        {
-            final Throwable root = ExceptionUtils.getRootCause(ex);
-            assertThat(root, instanceOf(ResponseException.class));
-            assertThat(root.getMessage(), startsWith("Evaluation exceeded the configured 'evaluationTimeout' threshold of 250 ms"));
-        }
-    }
-
     @Test
     public void shouldClusterReadFileFromResources() throws Exception {
         final Cluster cluster = Cluster.open(TestClientFactory.RESOURCE_PATH);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogDeprecatedIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogDeprecatedIntegrateTest.java
index a9d7228..e81f889 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogDeprecatedIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogDeprecatedIntegrateTest.java
@@ -31,10 +31,10 @@ import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
 import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.server.auth.AllowAllAuthenticator;
-import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
 import org.apache.tinkerpop.gremlin.server.auth.Krb5Authenticator;
 import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
 import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
 import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
 import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
@@ -81,7 +81,7 @@ public class GremlinServerAuditLogDeprecatedIntegrateTest extends AbstractGremli
         rootLogger.addAppender(recordingAppender);
 
         try {
-            final String moduleBaseDir = System.getProperty("basedir");
+            final String moduleBaseDir = System.getProperty("basedir", ".");
             final String authConfigName = moduleBaseDir + "/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-console-jaas.conf";
             System.setProperty("java.security.auth.login.config", authConfigName);
             kdcServer = new KdcFixture(moduleBaseDir);
@@ -139,6 +139,7 @@ public class GremlinServerAuditLogDeprecatedIntegrateTest extends AbstractGremli
                 settings.host = "localhost";
                 settings.channelizer = HttpChannelizer.class.getName();
                 authSettings.authenticator = SimpleAuthenticator.class.getName();
+                authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
                 authConfig.put(SimpleAuthenticator.CONFIG_CREDENTIALS_DB, "conf/tinkergraph-credentials.properties");
                 break;
         }
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java
index 69bb974..28ad1e1 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java
@@ -35,6 +35,7 @@ import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
 import org.apache.tinkerpop.gremlin.server.auth.Krb5Authenticator;
 import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
 import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
 import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
 import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
@@ -81,7 +82,7 @@ public class GremlinServerAuditLogIntegrateTest extends AbstractGremlinServerInt
         rootLogger.addAppender(recordingAppender);
 
         try {
-            final String moduleBaseDir = System.getProperty("basedir");
+            final String moduleBaseDir = System.getProperty("basedir", ".");
             final String authConfigName = moduleBaseDir + "/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-console-jaas.conf";
             System.setProperty("java.security.auth.login.config", authConfigName);
             kdcServer = new KdcFixture(moduleBaseDir);
@@ -139,6 +140,7 @@ public class GremlinServerAuditLogIntegrateTest extends AbstractGremlinServerInt
                 settings.host = "localhost";
                 settings.channelizer = HttpChannelizer.class.getName();
                 authSettings.authenticator = SimpleAuthenticator.class.getName();
+                authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
                 authConfig.put(SimpleAuthenticator.CONFIG_CREDENTIALS_DB, "conf/tinkergraph-credentials.properties");
                 break;
         }
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java
index 1486c1b..0b76dab 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java
@@ -65,7 +65,7 @@ public class GremlinServerAuthKrb5IntegrateTest extends AbstractGremlinServerInt
         handlerLogger.setLevel(Level.OFF);
 
         try {
-            final String projectBaseDir = System.getProperty("basedir");
+            final String projectBaseDir = System.getProperty("basedir", ".");
             final String authConfigName = projectBaseDir + "/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-console-jaas.conf";
             System.setProperty("java.security.auth.login.config", authConfigName);
             kdcServer = new KdcFixture(projectBaseDir);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthzIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthzIntegrateTest.java
index 5614123..00eacda 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthzIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthzIntegrateTest.java
@@ -35,6 +35,7 @@ import org.apache.tinkerpop.gremlin.server.auth.AllowAllAuthenticator;
 import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
 import org.apache.tinkerpop.gremlin.server.authz.AllowListAuthorizer;
 import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
 import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
 import org.apache.tinkerpop.gremlin.util.function.Lambda;
@@ -116,10 +117,12 @@ public class GremlinServerAuthzIntegrateTest extends AbstractGremlinServerIntegr
             case "shouldFailAuthorizeWithHttpTransport":
             case "shouldKeepAuthorizingWithHttpTransport":
                 settings.channelizer = HttpChannelizer.class.getName();
+                authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
                 break;
             case "shouldAuthorizeWithAllowAllAuthenticatorAndHttpTransport":
                 settings.channelizer = HttpChannelizer.class.getName();
                 authSettings.authenticator = AllowAllAuthenticator.class.getName();
+                authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
                 authSettings.config = null;
                 final String fileHttp = Objects.requireNonNull(getClass().getClassLoader().getResource(yamlHttpName)).getFile();
                 authzSettings.config.put(AllowListAuthorizer.KEY_AUTHORIZATION_ALLOWLIST, fileHttp);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
index fadc5e2..05cd939 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
@@ -36,6 +36,7 @@ import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
 import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
 import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
@@ -133,6 +134,7 @@ public class GremlinServerHttpIntegrateTest extends AbstractGremlinServerIntegra
     private void configureForAuthentication(final Settings settings) {
         final Settings.AuthenticationSettings authSettings = new Settings.AuthenticationSettings();
         authSettings.authenticator = SimpleAuthenticator.class.getName();
+        authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
 
         // use a credentials graph with two users in it: stephen/password and marko/rainbow-dash
         final Map<String,Object> authConfig = new HashMap<>();
@@ -147,7 +149,7 @@ public class GremlinServerHttpIntegrateTest extends AbstractGremlinServerIntegra
         authSettings.authenticator = SimpleAuthenticator.class.getName();
 
         //Add basic auth handler to make sure the reflection code path works.
-        authSettings.authenticationHandler = HttpBasicAuthenticationHandler.class.getName();
+        authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
 
         // use a credentials graph with two users in it: stephen/password and marko/rainbow-dash
         final Map<String,Object> authConfig = new HashMap<>();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
index 4f0922e..270bed2 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
@@ -40,6 +40,7 @@ import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine;
 import org.apache.tinkerpop.gremlin.groovy.jsr223.GroovyCompilerGremlinPlugin;
 import org.apache.tinkerpop.gremlin.groovy.jsr223.customizer.SimpleSandboxExtension;
 import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin;
+import org.apache.tinkerpop.gremlin.server.handler.UnifiedHandler;
 import org.apache.tinkerpop.gremlin.structure.RemoteGraph;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
@@ -58,7 +59,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -80,6 +80,7 @@ import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalS
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.AllOf.allOf;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.hamcrest.core.IsNot.not;
 import static org.hamcrest.core.StringStartsWith.startsWith;
@@ -115,9 +116,11 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
 
         if (name.getMethodName().equals("shouldPingChannelIfClientDies") ||
                 name.getMethodName().equals("shouldCloseChannelIfClientDoesntRespond")) {
-            final org.apache.log4j.Logger webSocketClientHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
-            previousLogLevel = webSocketClientHandlerLogger.getLevel();
-            webSocketClientHandlerLogger.setLevel(Level.INFO);
+            final org.apache.log4j.Logger opSelectorHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
+            final org.apache.log4j.Logger unifiedHandlerLogger = org.apache.log4j.Logger.getLogger(UnifiedHandler.class);
+            previousLogLevel = opSelectorHandlerLogger.getLevel();
+            opSelectorHandlerLogger.setLevel(Level.INFO);
+            unifiedHandlerLogger.setLevel(Level.INFO);
         }
 
         rootLogger.addAppender(recordingAppender);
@@ -129,8 +132,10 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
 
         if (name.getMethodName().equals("shouldPingChannelIfClientDies")||
                 name.getMethodName().equals("shouldCloseChannelIfClientDoesntRespond")) {
-            final org.apache.log4j.Logger webSocketClientHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
-            webSocketClientHandlerLogger.setLevel(previousLogLevel);
+            final org.apache.log4j.Logger opSelectorHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
+            opSelectorHandlerLogger.setLevel(previousLogLevel);
+            final org.apache.log4j.Logger unifiedHandlerLogger = org.apache.log4j.Logger.getLogger(UnifiedHandler.class);
+            unifiedHandlerLogger.setLevel(previousLogLevel);
         }
 
         rootLogger.removeAppender(recordingAppender);
@@ -145,6 +150,8 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
         switch (nameOfTest) {
             case "shouldProvideBetterExceptionForMethodCodeTooLarge":
                 settings.maxContentLength = 4096000;
+
+                // OpProcessor setting
                 final Settings.ProcessorSettings processorSettingsBig = new Settings.ProcessorSettings();
                 processorSettingsBig.className = StandardOpProcessor.class.getName();
                 processorSettingsBig.config = new HashMap<String,Object>() {{
@@ -152,6 +159,9 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
                 }};
                 settings.processors.clear();
                 settings.processors.add(processorSettingsBig);
+
+                // Unified setting
+                settings.maxParameters = Integer.MAX_VALUE;
                 break;
             case "shouldRespectHighWaterMarkSettingAndSucceed":
                 settings.writeBufferHighWaterMark = 64;
@@ -182,6 +192,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
                 settings.scriptEngines.get("gremlin-groovy").config = getScriptEngineConfForBaseScript();
                 break;
             case "shouldReturnInvalidRequestArgsWhenBindingCountExceedsAllowable":
+                // OpProcessor settings
                 final Settings.ProcessorSettings processorSettingsSmall = new Settings.ProcessorSettings();
                 processorSettingsSmall.className = StandardOpProcessor.class.getName();
                 processorSettingsSmall.config = new HashMap<String,Object>() {{
@@ -189,6 +200,9 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
                 }};
                 settings.processors.clear();
                 settings.processors.add(processorSettingsSmall);
+
+                // Unified settings
+                settings.maxParameters = 1;
                 break;
             case "shouldTimeOutRemoteTraversal":
                 settings.evaluationTimeout = 500;
@@ -699,7 +713,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
     public void shouldReceiveFailureTimeOutOnScriptEval() throws Exception {
         try (SimpleClient client = TestClientFactory.createWebSocketClient()){
             final List<ResponseMessage> responses = client.submit("Thread.sleep(3000);'some-stuff-that-should not return'");
-            assertThat(responses.get(0).getStatus().getMessage(), startsWith("Evaluation exceeded the configured 'evaluationTimeout' threshold of 1000 ms"));
+            assertThat(responses.get(0).getStatus().getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("1000 ms")));
 
             // validate that we can still send messages to the server
             assertEquals(2, ((List<Integer>) client.submit("1+1").get(0).getResult().getData()).get(0).intValue());
@@ -715,7 +729,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
                     .addArg(Tokens.ARGS_GREMLIN, "Thread.sleep(3000);'some-stuff-that-should not return'")
                     .create();
             final List<ResponseMessage> responses = client.submit(msg);
-            assertThat(responses.get(0).getStatus().getMessage(), startsWith("Evaluation exceeded the configured 'evaluationTimeout' threshold of 100 ms"));
+            assertThat(responses.get(0).getStatus().getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("100 ms")));
 
             // validate that we can still send messages to the server
             assertEquals(2, ((List<Integer>) client.submit("1+1").get(0).getResult().getData()).get(0).intValue());
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
index 3dc58ae..fe9ff99 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
@@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
 import org.apache.tinkerpop.gremlin.driver.simple.SimpleClient;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
 import org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor;
 import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
 import org.junit.After;
@@ -94,11 +95,16 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
             case "shouldHaveTheSessionTimeout":
             case "shouldCloseSessionOnceOnRequest":
                 settings.processors.clear();
+
+                // OpProcessor setting
                 final Settings.ProcessorSettings processorSettings = new Settings.ProcessorSettings();
                 processorSettings.className = SessionOpProcessor.class.getCanonicalName();
                 processorSettings.config = new HashMap<>();
                 processorSettings.config.put(SessionOpProcessor.CONFIG_SESSION_TIMEOUT, 3000L);
                 settings.processors.add(processorSettings);
+
+                // Unified setting
+                settings.sessionLifetimeTimeout = 3000L;
                 break;
             case "shouldCloseSessionOnClientClose":
                 clearNeo4j(settings);
@@ -106,13 +112,27 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
             case "shouldEnsureSessionBindingsAreThreadSafe":
                 settings.threadPoolWorker = 2;
                 break;
+            case "shouldUseGlobalFunctionCache":
+                // OpProcessor settings are good by default
+                // UnifiedHandler settings
+                settings.useCommonEngineForSessions = false;
+                settings.useGlobalFunctionCacheForSessions = true;
+
+                break;
             case "shouldNotUseGlobalFunctionCache":
                 settings.processors.clear();
+
+                // OpProcessor settings
                 final Settings.ProcessorSettings processorSettingsForDisableFunctionCache = new Settings.ProcessorSettings();
                 processorSettingsForDisableFunctionCache.className = SessionOpProcessor.class.getCanonicalName();
                 processorSettingsForDisableFunctionCache.config = new HashMap<>();
                 processorSettingsForDisableFunctionCache.config.put(SessionOpProcessor.CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED, false);
                 settings.processors.add(processorSettingsForDisableFunctionCache);
+
+                // UnifiedHandler settings
+                settings.useCommonEngineForSessions = false;
+                settings.useGlobalFunctionCacheForSessions = false;
+
                 break;
             case "shouldExecuteInSessionAndSessionlessWithoutOpeningTransactionWithSingleClient":
             case "shouldExecuteInSessionWithTransactionManagement":
@@ -124,6 +144,11 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
         return settings;
     }
 
+    private boolean isUsingUnifiedChannelizer() {
+        return server.getServerGremlinExecutor().
+                getSettings().channelizer.equals(UnifiedChannelizer.class.getName());
+    }
+
     private static void clearNeo4j(Settings settings) {
         deleteDirectory(new File("/tmp/neo4j"));
         settings.graphs.put("graph", "conf/neo4j-empty.properties");
@@ -140,8 +165,13 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
         client1.close();
         cluster1.close();
 
-        assertThat(recordingAppender.getMessages(), hasItem("INFO - Skipped attempt to close open graph transactions on shouldCloseSessionOnClientClose - close was forced\n"));
-        assertThat(recordingAppender.getMessages(), hasItem("INFO - Session shouldCloseSessionOnClientClose closed\n"));
+        // the following session close log message is no longer relevant as
+        if (isUsingUnifiedChannelizer()) {
+            assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
+        } else {
+            assertThat(recordingAppender.getMessages(), hasItem("INFO - Skipped attempt to close open graph transactions on shouldCloseSessionOnClientClose - close was forced\n"));
+            assertThat(recordingAppender.getMessages(), hasItem("INFO - Session shouldCloseSessionOnClientClose closed\n"));
+        }
 
         // try to reconnect to that session and make sure no state is there
         final Cluster clusterReconnect = TestClientFactory.open();
@@ -159,16 +189,27 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
         // the commit from client1 should not have gone through so there should be no data present.
         assertEquals(0, clientReconnect.submit("graph.traversal().V().count()").all().join().get(0).getInt());
         clusterReconnect.close();
+
+        if (isUsingUnifiedChannelizer()) {
+            assertEquals(0, ((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().getActiveSessionCount());
+        }
     }
 
     @Test
     public void shouldUseGlobalFunctionCache() throws Exception {
         final Cluster cluster = TestClientFactory.open();
-        final Client client = cluster.connect(name.getMethodName());
+        final Client session = cluster.connect(name.getMethodName());
+        final Client client = cluster.connect();
+
+        assertEquals(3, session.submit("def sumItUp(x,y){x+y};sumItUp(1,2)").all().get().get(0).getInt());
+        assertEquals(3, session.submit("sumItUp(1,2)").all().get().get(0).getInt());
 
         try {
-            assertEquals(3, client.submit("def addItUp(x,y){x+y};addItUp(1,2)").all().get().get(0).getInt());
-            assertEquals(3, client.submit("addItUp(1,2)").all().get().get(0).getInt());
+            client.submit("sumItUp(1,2)").all().get().get(0).getInt();
+            fail("Global functions should not be cached so the call to sumItUp() should fail");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root.getMessage(), startsWith("No signature of method"));
         } finally {
             cluster.close();
         }
@@ -180,14 +221,14 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
         final Client client = cluster.connect(name.getMethodName());
 
         try {
-            assertEquals(3, client.submit("def addItUp(x,y){x+y};addItUp(1,2)").all().get().get(0).getInt());
+            assertEquals(3, client.submit("def sumItUp(x,y){x+y};sumItUp(1,2)").all().get().get(0).getInt());
         } catch (Exception ex) {
             cluster.close();
             throw ex;
         }
 
         try {
-            client.submit("addItUp(1,2)").all().get().get(0).getInt();
+            client.submit("sumItUp(1,2)").all().get().get(0).getInt();
             fail("Global functions should not be cached so the call to addItUp() should fail");
         } catch (Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
@@ -273,8 +314,12 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
             cluster.close();
         }
 
-        assertEquals(1, recordingAppender.getMessages().stream()
-                .filter(msg -> msg.equals("INFO - Session shouldCloseSessionOnceOnRequest closed\n")).count());
+        if (isUsingUnifiedChannelizer()) {
+            assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
+        } else {
+            assertEquals(1, recordingAppender.getMessages().stream()
+                    .filter(msg -> msg.equals("INFO - Session shouldCloseSessionOnceOnRequest closed\n")).count());
+        }
     }
 
     @Test
@@ -308,9 +353,13 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
             cluster.close();
         }
 
-        // there will be one for the timeout and a second for closing the cluster
-        assertEquals(2, recordingAppender.getMessages().stream()
-                .filter(msg -> msg.equals("INFO - Session shouldHaveTheSessionTimeout closed\n")).count());
+        if (isUsingUnifiedChannelizer()) {
+            assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
+        } else {
+            // there will be one for the timeout and a second for closing the cluster
+            assertEquals(2, recordingAppender.getMessages().stream()
+                    .filter(msg -> msg.equals("INFO - Session shouldHaveTheSessionTimeout closed\n")).count());
+        }
     }
 
     @Test
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
index 166764b..f28969c 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
@@ -22,6 +22,7 @@ import org.apache.http.NoHttpResponseException;
 import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
 import org.apache.tinkerpop.gremlin.server.Settings;
 
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
 import org.junit.Test;
 import org.junit.Assert;
 
@@ -29,6 +30,9 @@ import java.util.Map;
 import java.util.HashMap;
 import java.net.SocketException;
 
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assume.assumeThat;
+
 public class HttpChannelizerIntegrateTest extends AbstractGremlinServerChannelizerIntegrateTest {
 
     @Override
@@ -74,6 +78,7 @@ public class HttpChannelizerIntegrateTest extends AbstractGremlinServerChanneliz
         final Settings.AuthenticationSettings authSettings = new Settings.AuthenticationSettings();
         final Map<String,Object> authConfig = new HashMap<>();
         authSettings.authenticator = SimpleAuthenticator.class.getName();
+        authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
         authConfig.put(SimpleAuthenticator.CONFIG_CREDENTIALS_DB, "conf/tinkergraph-credentials.properties");
         authSettings.config = authConfig;
 
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizerIntegrateTest.java
similarity index 58%
copy from gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
copy to gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizerIntegrateTest.java
index 166764b..a090a20 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizerIntegrateTest.java
@@ -18,55 +18,28 @@
  */
 package org.apache.tinkerpop.gremlin.server.channel;
 
-import org.apache.http.NoHttpResponseException;
-import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
 import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
 
-import org.junit.Test;
-import org.junit.Assert;
-
-import java.util.Map;
 import java.util.HashMap;
-import java.net.SocketException;
-
-public class HttpChannelizerIntegrateTest extends AbstractGremlinServerChannelizerIntegrateTest {
-
-    @Override
-    public Settings overrideSettings(final Settings settings) {
-        super.overrideSettings(settings);
-        final String nameOfTest = name.getMethodName();
-        if (nameOfTest.equals("shouldBreakOnInvalidAuthenticationHandler") ) {
-            settings.authentication = getAuthSettings();
-            settings.authentication.authenticationHandler = "Foo.class";
-        }
-        return settings;
-    }
+import java.util.Map;
 
-    @Test
-    public void shouldBreakOnInvalidAuthenticationHandler() throws Exception {
-        final CombinedTestClient client =  new CombinedTestClient(getProtocol());
-        try {
-            client.sendAndAssert("2+2", 4);
-            Assert.fail("An exception should be thrown with an invalid authentication handler");
-        } catch (NoHttpResponseException | SocketException e) {
-        } finally {
-            client.close();
-        }
-    }
+public class UnifiedChannelizerIntegrateTest extends AbstractGremlinServerChannelizerIntegrateTest {
 
     @Override
     public String getProtocol() {
-        return HTTP;
+        return WS_AND_HTTP;
     }
 
     @Override
     public String getSecureProtocol() {
-        return HTTPS;
+        return WSS_AND_HTTPS;
     }
 
     @Override
     public String getChannelizer() {
-        return HttpChannelizer.class.getName();
+        return UnifiedChannelizer.class.getName();
     }
 
     @Override
@@ -74,6 +47,7 @@ public class HttpChannelizerIntegrateTest extends AbstractGremlinServerChanneliz
         final Settings.AuthenticationSettings authSettings = new Settings.AuthenticationSettings();
         final Map<String,Object> authConfig = new HashMap<>();
         authSettings.authenticator = SimpleAuthenticator.class.getName();
+        authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
         authConfig.put(SimpleAuthenticator.CONFIG_CREDENTIALS_DB, "conf/tinkergraph-credentials.properties");
         authSettings.config = authConfig;
 
diff --git a/gremlin-tools/gremlin-benchmark/pom.xml b/gremlin-tools/gremlin-benchmark/pom.xml
index 688f44f..d0cdc0e 100644
--- a/gremlin-tools/gremlin-benchmark/pom.xml
+++ b/gremlin-tools/gremlin-benchmark/pom.xml
@@ -65,11 +65,6 @@ limitations under the License.
             <version>${project.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.tinkerpop</groupId>
-            <artifactId>tinkergraph-gremlin</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.openjdk.jmh</groupId>
             <artifactId>jmh-core</artifactId>
             <version>${jmh.version}</version>
diff --git a/hadoop-gremlin/pom.xml b/hadoop-gremlin/pom.xml
index 3e50091..d04d2e1 100644
--- a/hadoop-gremlin/pom.xml
+++ b/hadoop-gremlin/pom.xml
@@ -143,11 +143,6 @@ limitations under the License.
             <version>1.2</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-compress</artifactId>
-            <version>1.19</version>
-        </dependency>
-        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
             <version>16.0.1</version>