You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by va...@apache.org on 2024/04/08 21:04:23 UTC

(tinkerpop) 01/01: PoC

This is an automated email from the ASF dual-hosted git repository.

valentyn pushed a commit to branch valentyn/http-java-driver
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit af665f98178a4ad05835edfa08a0600bf092d129
Author: Valentyn Kahamlyk <Va...@improving.com>
AuthorDate: Mon Apr 8 14:04:12 2024 -0700

    PoC
---
 .../tinkerpop/gremlin/driver/Channelizer.java      |  32 +--
 .../apache/tinkerpop/gremlin/driver/Handler.java   | 282 ---------------------
 .../tinkerpop/gremlin/driver/ResultQueue.java      |   9 +-
 .../driver/handler/GremlinResponseHandler.java     | 135 ++++++++++
 .../driver/handler/HttpGremlinRequestEncoder.java  |  11 +
 .../HttpGremlinResponseDebugStreamDecoder.java     |  40 +++
 .../driver/handler/HttpGremlinResponseDecoder.java |   1 +
 .../handler/HttpGremlinResponseStreamDecoder.java  |  92 +++++++
 .../gremlin/driver/simple/SimpleHttpClient.java    |  22 +-
 .../gremlin/server/HttpDriverIntegrateTest.java    |  61 +----
 gremlin-server/src/test/resources/logback-test.xml |   2 +-
 11 files changed, 325 insertions(+), 362 deletions(-)

diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index a626e0160b..fae760178d 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -18,28 +18,29 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
-import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
-import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder;
-import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDecoder;
-import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
-import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
-import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.DefaultHttpHeaders;
 import io.netty.handler.codec.http.HttpClientCodec;
+import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.codec.http.DefaultHttpHeaders;
 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketVersion;
 import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.timeout.IdleStateHandler;
-
+import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
+import org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler;
+import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder;
+import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
+import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
+import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder;
+import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseStreamDecoder;
+import org.apache.tinkerpop.gremlin.util.ser.MessageChunkSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -144,8 +145,7 @@ public interface Channelizer extends ChannelHandler {
             }
 
             configure(pipeline);
-            pipeline.addLast(PIPELINE_GREMLIN_SASL_HANDLER, new Handler.GremlinSaslAuthenticationHandler(cluster.authProperties()));
-            pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new Handler.GremlinResponseHandler(pending));
+            pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new GremlinResponseHandler(pending));
         }
     }
 
@@ -250,15 +250,15 @@ public interface Channelizer extends ChannelHandler {
     /**
      * Sends requests over the HTTP endpoint. Client functionality is governed by the limitations of the HTTP endpoint,
      * meaning that sessions are not available and as such {@code tx()} (i.e. transactions) are not available over this
-     * channelizer. Only sessionless requests are possible. Some driver configuration options may not be relevant when
-     * using HTTP, such as {@link Tokens#ARGS_BATCH_SIZE} since HTTP does not stream results back in that fashion.
+     * channelizer. Only sessionless requests are possible.
      */
     public final class HttpChannelizer extends AbstractChannelizer {
 
         private HttpClientCodec handler;
 
         private HttpGremlinRequestEncoder gremlinRequestEncoder;
-        private HttpGremlinResponseDecoder gremlinResponseDecoder;
+        // private HttpGremlinResponseDecoder gremlinResponseDecoder;
+        private HttpGremlinResponseStreamDecoder gremlinResponseDecoder;
 
         @Override
         public void init(final Connection connection) {
@@ -269,7 +269,7 @@ public interface Channelizer extends ChannelHandler {
                 throw new IllegalStateException(String.format("Cannot use sessions or tx() with %s", HttpChannelizer.class.getSimpleName()));
 
             gremlinRequestEncoder = new HttpGremlinRequestEncoder(cluster.getSerializer(), cluster.getRequestInterceptor(), cluster.isUserAgentOnConnectEnabled());
-            gremlinResponseDecoder = new HttpGremlinResponseDecoder(cluster.getSerializer());
+            gremlinResponseDecoder = new HttpGremlinResponseStreamDecoder((MessageChunkSerializer<?>) cluster.getSerializer());
         }
 
         @Override
@@ -296,7 +296,7 @@ public interface Channelizer extends ChannelHandler {
             handler = new HttpClientCodec();
 
             pipeline.addLast("http-codec", handler);
-            pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength));
+            // pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength));
             pipeline.addLast("gremlin-encoder", gremlinRequestEncoder);
             pipeline.addLast("gremlin-decoder", gremlinResponseDecoder);
         }
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
deleted file mode 100644
index c76a20e6a7..0000000000
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.driver;
-
-import io.netty.util.AttributeMap;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
-import org.apache.tinkerpop.gremlin.util.Tokens;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
-import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
-import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.util.Attribute;
-import io.netty.util.AttributeKey;
-import io.netty.util.ReferenceCountUtil;
-import org.apache.tinkerpop.gremlin.util.ser.SerializationException;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.security.PrivilegedActionException;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.security.auth.Subject;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.login.LoginContext;
-import javax.security.auth.login.LoginException;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
-/**
- * Holder for internal handler classes used in constructing the channel pipeline.
- *
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-final class Handler {
-
-    /**
-     * Generic SASL handler that will authenticate against the gremlin server.
-     */
-    static class GremlinSaslAuthenticationHandler extends SimpleChannelInboundHandler<ResponseMessage> implements CallbackHandler {
-        private static final Logger logger = LoggerFactory.getLogger(GremlinSaslAuthenticationHandler.class);
-        private static final AttributeKey<Subject> subjectKey = AttributeKey.valueOf("subject");
-        private static final AttributeKey<SaslClient> saslClientKey = AttributeKey.valueOf("saslclient");
-        private static final Map<String, String> SASL_PROPERTIES = new HashMap<String, String>() {{ put(Sasl.SERVER_AUTH, "true"); }};
-        private static final byte[] NULL_CHALLENGE = new byte[0];
-
-        private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
-        private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
-
-        private final AuthProperties authProps;
-
-        public GremlinSaslAuthenticationHandler(final AuthProperties authProps) {
-            this.authProps = authProps;
-        }
-
-        @Override
-        protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) throws Exception {
-            // We are only interested in AUTHENTICATE responses here. Everything else can
-            // get passed down the pipeline
-            if (response.getStatus().getCode() == ResponseStatusCode.AUTHENTICATE) {
-                final Attribute<SaslClient> saslClient = ((AttributeMap) channelHandlerContext).attr(saslClientKey);
-                final Attribute<Subject> subject = ((AttributeMap) channelHandlerContext).attr(subjectKey);
-                final RequestMessage.Builder messageBuilder = RequestMessage.build(Tokens.OPS_AUTHENTICATION);
-                // First time through we don't have a sasl client
-                if (saslClient.get() == null) {
-                    subject.set(login());
-                    try {
-                        saslClient.set(saslClient(getHostName(channelHandlerContext)));
-                    } catch (SaslException saslException) {
-                        // push the sasl error into a failure response from the server. this ensures that standard
-                        // processing for the ResultQueue is kept. without this SaslException trap and subsequent
-                        // conversion to an authentication failure, the close() of the connection might not
-                        // succeed as it will appear as though pending messages remain present in the queue on the
-                        // connection and the shutdown won't proceed
-                        final ResponseMessage clientSideError = ResponseMessage.build(response.getRequestId())
-                                .code(ResponseStatusCode.FORBIDDEN).statusMessage(saslException.getMessage()).create();
-                        channelHandlerContext.fireChannelRead(clientSideError);
-                        return;
-                    }
-
-                    messageBuilder.addArg(Tokens.ARGS_SASL_MECHANISM, getMechanism());
-                    messageBuilder.addArg(Tokens.ARGS_SASL, saslClient.get().hasInitialResponse() ?
-                            BASE64_ENCODER.encodeToString(evaluateChallenge(subject, saslClient, NULL_CHALLENGE)) : null);
-                } else {
-                    // the server sends base64 encoded sasl as well as the byte array. the byte array will eventually be
-                    // phased out, but is present now for backward compatibility in 3.2.x
-                    final String base64sasl = response.getStatus().getAttributes().containsKey(Tokens.ARGS_SASL) ?
-                        response.getStatus().getAttributes().get(Tokens.ARGS_SASL).toString() :
-                        BASE64_ENCODER.encodeToString((byte[]) response.getResult().getData());
-
-                    messageBuilder.addArg(Tokens.ARGS_SASL, BASE64_ENCODER.encodeToString(
-                        evaluateChallenge(subject, saslClient, BASE64_DECODER.decode(base64sasl))));
-                }
-                channelHandlerContext.writeAndFlush(messageBuilder.create());
-            } else {
-                // SimpleChannelInboundHandler will release the frame if we don't retain it explicitly.
-                ReferenceCountUtil.retain(response);
-                channelHandlerContext.fireChannelRead(response);
-            }
-        }
-
-        public void handle(final Callback[] callbacks) {
-            for (Callback callback : callbacks) {
-                if (callback instanceof NameCallback) {
-                    if (authProps.get(AuthProperties.Property.USERNAME) != null) {
-                        ((NameCallback)callback).setName(authProps.get(AuthProperties.Property.USERNAME));
-                    }
-                } else if (callback instanceof PasswordCallback) {
-                    if (authProps.get(AuthProperties.Property.PASSWORD) != null) {
-                        ((PasswordCallback)callback).setPassword(authProps.get(AuthProperties.Property.PASSWORD).toCharArray());
-                    }
-                } else {
-                    logger.warn("SASL handler got a callback of type " + callback.getClass().getCanonicalName());
-                }
-            }
-        }
-
-        private byte[] evaluateChallenge(final Attribute<Subject> subject, final Attribute<SaslClient> saslClient,
-                                         final byte[] challenge) throws SaslException {
-
-            if (subject.get() == null) {
-                return saslClient.get().evaluateChallenge(challenge);
-            } else {
-                // If we have a subject then run this as a privileged action using the subject
-                try {
-                    return Subject.doAs(subject.get(), (PrivilegedExceptionAction<byte[]>) () -> saslClient.get().evaluateChallenge(challenge));
-                } catch (PrivilegedActionException e) {
-                    throw (SaslException)e.getException();
-                }
-            }
-        }
-
-        private Subject login() throws LoginException {
-            // Login if the user provided us with an entry into the JAAS config file
-            if (authProps.get(AuthProperties.Property.JAAS_ENTRY) != null) {
-                final LoginContext login = new LoginContext(authProps.get(AuthProperties.Property.JAAS_ENTRY));
-                login.login();
-                return login.getSubject();
-            }
-            return null;
-        }
-
-        private SaslClient saslClient(final String hostname) throws SaslException {
-            return Sasl.createSaslClient(new String[] { getMechanism() }, null, authProps.get(AuthProperties.Property.PROTOCOL),
-                                         hostname, SASL_PROPERTIES, this);
-        }
-
-        private String getHostName(final ChannelHandlerContext channelHandlerContext) {
-            return ((InetSocketAddress)channelHandlerContext.channel().remoteAddress()).getAddress().getHostName();
-        }
-
-        /**
-         * Work out the Sasl mechanism based on the user supplied parameters.
-         * If we have a username and password use PLAIN otherwise GSSAPI
-         * ToDo: have gremlin-server provide the mechanism(s) it is configured with, so that additional mechanisms can
-         * be supported in the driver and confusing GSSException messages from the driver are avoided
-         */
-        private String getMechanism() {
-            if ((authProps.get(AuthProperties.Property.USERNAME) != null) &&
-                (authProps.get(AuthProperties.Property.PASSWORD) != null)) {
-                return "PLAIN";
-            } else {
-                return "GSSAPI";
-            }
-        }
-    }
-
-    /**
-     * Takes a map of requests pending responses and writes responses to the {@link ResultQueue} of a request
-     * as the {@link ResponseMessage} objects are deserialized.
-     */
-    static class GremlinResponseHandler extends SimpleChannelInboundHandler<ResponseMessage> {
-        private static final Logger logger = LoggerFactory.getLogger(GremlinResponseHandler.class);
-        private final ConcurrentMap<UUID, ResultQueue> pending;
-
-        public GremlinResponseHandler(final ConcurrentMap<UUID, ResultQueue> pending) {
-            this.pending = pending;
-        }
-
-        @Override
-        public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
-            // occurs when the server shuts down in a disorderly fashion, otherwise in an orderly shutdown the server
-            // should fire off a close message which will properly release the driver.
-            super.channelInactive(ctx);
-
-            // the channel isn't going to get anymore results as it is closed so release all pending requests
-            pending.values().forEach(val -> val.markError(new IllegalStateException("Connection to server is no longer active")));
-            pending.clear();
-        }
-
-        @Override
-        protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) throws Exception {
-            final ResponseStatusCode statusCode = response.getStatus().getCode();
-            final ResultQueue queue = pending.get(response.getRequestId());
-            if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.PARTIAL_CONTENT) {
-                final Object data = response.getResult().getData();
-
-                // this is a "result" from the server which is either the result of a script or a
-                // serialized traversal
-                if (data instanceof List) {
-                    // unrolls the collection into individual results to be handled by the queue.
-                    final List<Object> listToUnroll = (List<Object>) data;
-                    listToUnroll.forEach(item -> queue.add(new Result(item)));
-                } else {
-                    // since this is not a list it can just be added to the queue
-                    queue.add(new Result(response.getResult().getData()));
-                }
-            } else {
-                // this is a "success" but represents no results otherwise it is an error
-                if (statusCode != ResponseStatusCode.NO_CONTENT) {
-                    final Map<String,Object> attributes = response.getStatus().getAttributes();
-                    final String stackTrace = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) ?
-                            (String) attributes.get(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) : null;
-                    final List<String> exceptions = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) ?
-                            (List<String>) attributes.get(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) : null;
-                    queue.markError(new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage(),
-                            exceptions, stackTrace, cleanStatusAttributes(attributes)));
-                }
-            }
-
-            // as this is a non-PARTIAL_CONTENT code - the stream is done.
-            if (statusCode != ResponseStatusCode.PARTIAL_CONTENT) {
-                pending.remove(response.getRequestId()).markComplete(response.getStatus().getAttributes());
-            }
-        }
-
-        @Override
-        public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
-            // if this happens enough times (like the client is unable to deserialize a response) the pending
-            // messages queue will not clear.  wonder if there is some way to cope with that.  of course, if
-            // there are that many failures someone would take notice and hopefully stop the client.
-            logger.error("Could not process the response", cause);
-
-            // the channel took an error because of something pretty bad so release all the futures out there
-            pending.values().forEach(val -> val.markError(cause));
-            pending.clear();
-
-            // serialization exceptions should not close the channel - that's worth a retry
-            if (!IteratorUtils.anyMatch(ExceptionUtils.getThrowableList(cause).iterator(), t -> t instanceof SerializationException))
-                if (ctx.channel().isActive()) ctx.close();
-        }
-
-        private Map<String,Object> cleanStatusAttributes(final Map<String,Object> statusAttributes) {
-            final Map<String,Object> m = new HashMap<>();
-            statusAttributes.forEach((k,v) -> {
-                if (!k.equals(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) && !k.equals(Tokens.STATUS_ATTRIBUTE_STACK_TRACE))
-                    m.put(k,v);
-            });
-            return m;
-        }
-    }
-
-}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
index 8ce4fba21c..c4e827503d 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
+import org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.javatuples.Pair;
 
@@ -33,13 +34,13 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * A queue of incoming {@link Result} objects.  The queue is updated by the {@link Handler.GremlinResponseHandler}
+ * A queue of incoming {@link Result} objects.  The queue is updated by the {@link GremlinResponseHandler}
  * until a response terminator is identified.
  *
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
 @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-final class ResultQueue {
+public final class ResultQueue {
 
     private final LinkedBlockingQueue<Result> resultLinkedBlockingQueue;
 
@@ -104,7 +105,7 @@ final class ResultQueue {
         resultLinkedBlockingQueue.drainTo(collection);
     }
 
-    void markComplete(final Map<String,Object> statusAttributes) {
+     public void markComplete(final Map<String, Object> statusAttributes) {
         // if there was some aggregation performed in the queue then the full object is hanging out waiting to be
         // added to the ResultSet
         if (aggregatedResult != null)
@@ -117,7 +118,7 @@ final class ResultQueue {
         this.drainAllWaiting();
     }
 
-    void markError(final Throwable throwable) {
+    public void markError(final Throwable throwable) {
         error.set(throwable);
         this.readComplete.completeExceptionally(throwable);
         this.drainAllWaiting();
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
new file mode 100644
index 0000000000..daa9384b1c
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.AttributeMap;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.ResultQueue;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.util.Tokens;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.util.ser.SerializationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder.REQUEST_ID;
+
+/**
+ * Takes a map of requests pending responses and writes responses to the {@link ResultQueue} of a request
+ * as the {@link ResponseMessage} objects are deserialized.
+ */
+public class GremlinResponseHandler extends SimpleChannelInboundHandler<ResponseMessage> {
+    private static final Logger logger = LoggerFactory.getLogger(GremlinResponseHandler.class);
+    private final ConcurrentMap<UUID, ResultQueue> pending;
+
+    public GremlinResponseHandler(final ConcurrentMap<UUID, ResultQueue> pending) {
+        this.pending = pending;
+    }
+
+    @Override
+    public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+        // occurs when the server shuts down in a disorderly fashion, otherwise in an orderly shutdown the server
+        // should fire off a close message which will properly release the driver.
+        super.channelInactive(ctx);
+
+        // the channel isn't going to get anymore results as it is closed so release all pending requests
+        pending.values().forEach(val -> val.markError(new IllegalStateException("Connection to server is no longer active")));
+        pending.clear();
+    }
+
+    @Override
+    protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) throws Exception {
+        final UUID requestId = ((AttributeMap) channelHandlerContext).attr(REQUEST_ID).get();
+
+        final ResponseStatusCode statusCode = response.getStatus() == null ? ResponseStatusCode.PARTIAL_CONTENT : response.getStatus().getCode();
+        final ResultQueue queue = pending.get(requestId);
+        System.out.println("Handler.GremlinResponseHandler get requestId: " + requestId);
+        if (response.getResult().getData() != null) {
+            System.out.println("Handler.GremlinResponseHandler payload size: " + ((List) response.getResult().getData()).size());
+        }
+
+        if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.PARTIAL_CONTENT) {
+            final Object data = response.getResult().getData();
+
+            // this is a "result" from the server which is either the result of a script or a
+            // serialized traversal
+            if (data instanceof List) {
+                // unrolls the collection into individual results to be handled by the queue.
+                final List<Object> listToUnroll = (List<Object>) data;
+                listToUnroll.forEach(item -> queue.add(new Result(item)));
+            } else {
+                // since this is not a list it can just be added to the queue
+                queue.add(new Result(response.getResult().getData()));
+            }
+        } else {
+            // this is a "success" but represents no results otherwise it is an error
+            if (statusCode != ResponseStatusCode.NO_CONTENT) {
+                final Map<String, Object> attributes = response.getStatus().getAttributes();
+                final String stackTrace = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) ?
+                        (String) attributes.get(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) : null;
+                final List<String> exceptions = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) ?
+                        (List<String>) attributes.get(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) : null;
+                queue.markError(new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage(),
+                        exceptions, stackTrace, cleanStatusAttributes(attributes)));
+            }
+        }
+
+        // todo:
+        // as this is a non-PARTIAL_CONTENT code - the stream is done.
+        if (statusCode != ResponseStatusCode.PARTIAL_CONTENT) {
+            pending.remove(requestId).markComplete(response.getStatus().getAttributes());
+        }
+    }
+
+    @Override
+    public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
+        // if this happens enough times (like the client is unable to deserialize a response) the pending
+        // messages queue will not clear.  wonder if there is some way to cope with that.  of course, if
+        // there are that many failures someone would take notice and hopefully stop the client.
+        logger.error("Could not process the response", cause);
+
+        // the channel took an error because of something pretty bad so release all the futures out there
+        pending.values().forEach(val -> val.markError(cause));
+        pending.clear();
+
+        // serialization exceptions should not close the channel - that's worth a retry
+        if (!IteratorUtils.anyMatch(ExceptionUtils.getThrowableList(cause).iterator(), t -> t instanceof SerializationException))
+            if (ctx.channel().isActive()) ctx.close();
+    }
+
+    private Map<String, Object> cleanStatusAttributes(final Map<String, Object> statusAttributes) {
+        final Map<String, Object> m = new HashMap<>();
+        statusAttributes.forEach((k, v) -> {
+            if (!k.equals(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) && !k.equals(Tokens.STATUS_ATTRIBUTE_STACK_TRACE))
+                m.put(k, v);
+        });
+        return m;
+    }
+}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
index 06377e31be..fc220c6819 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
@@ -27,6 +27,9 @@ import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpVersion;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import io.netty.util.AttributeMap;
 import org.apache.tinkerpop.gremlin.driver.UserAgent;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
@@ -37,6 +40,7 @@ import org.apache.tinkerpop.gremlin.util.ser.MessageTextSerializerV4;
 import org.apache.tinkerpop.gremlin.util.ser.SerTokens;
 
 import java.util.List;
+import java.util.UUID;
 import java.util.function.UnaryOperator;
 
 /**
@@ -44,6 +48,9 @@ import java.util.function.UnaryOperator;
  */
 @ChannelHandler.Sharable
 public final class HttpGremlinRequestEncoder extends MessageToMessageEncoder<RequestMessageV4> {
+
+    //todo: move
+    public static final AttributeKey<UUID> REQUEST_ID = AttributeKey.valueOf("requestId");
     private final MessageSerializer<?> serializer;
     private final boolean userAgentEnabled;
     private final UnaryOperator<FullHttpRequest> interceptor;
@@ -63,6 +70,10 @@ public final class HttpGremlinRequestEncoder extends MessageToMessageEncoder<Req
 
     @Override
     protected void encode(final ChannelHandlerContext channelHandlerContext, final RequestMessageV4 requestMessage, final List<Object> objects) throws Exception {
+        final Attribute<UUID> requestIdAttribute = ((AttributeMap) channelHandlerContext).attr(REQUEST_ID);
+        requestIdAttribute.set(requestMessage.getRequestId());
+        System.out.println("HttpGremlinRequestEncoder set requestId: " + requestIdAttribute.get());
+
         final String mimeType = serializer.mimeTypesSupported()[0];
         // only GraphSON3 and GraphBinary recommended for serialization of Bytecode requests
         if (requestMessage.getArg("gremlin") instanceof Bytecode &&
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDebugStreamDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDebugStreamDecoder.java
new file mode 100644
index 0000000000..35dc49ef30
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDebugStreamDecoder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+
+import java.util.List;
+
+/**
+ * Converts {@code HttpResponse} to a {@link ResponseMessage}.
+ */
+@ChannelHandler.Sharable
+public final class HttpGremlinResponseDebugStreamDecoder extends MessageToMessageDecoder<ResponseMessage> {
+    public HttpGremlinResponseDebugStreamDecoder() {}
+
+    @Override
+    protected void decode(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response, final List<Object> objects) throws Exception {
+        System.out.println("HttpGremlinResponseStreamDecoder: ");
+        System.out.println(response.getResult());
+    }
+}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java
index 9b46db27ee..4d7d452239 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java
@@ -37,6 +37,7 @@ import java.util.UUID;
 
 /**
  * Converts {@code HttpResponse} to a {@link ResponseMessage}.
+ * Can be used for GraphSON
  */
 @ChannelHandler.Sharable
 public final class HttpGremlinResponseDecoder extends MessageToMessageDecoder<FullHttpResponse> {
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java
new file mode 100644
index 0000000000..1306f7fff0
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.http.DefaultHttpContent;
+import io.netty.handler.codec.http.DefaultHttpObject;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.DefaultLastHttpContent;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import io.netty.util.AttributeMap;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.ser.MessageChunkSerializer;
+import org.apache.tinkerpop.gremlin.util.ser.SerializationException;
+
+import java.util.List;
+import java.util.Objects;
+
+public class HttpGremlinResponseStreamDecoder extends MessageToMessageDecoder<DefaultHttpObject> {
+
+    // todo: move out
+    public static final AttributeKey<Boolean> IS_FIRST_CHUNK = AttributeKey.valueOf("isFirstChunk");
+
+    private final MessageChunkSerializer<?> serializer;
+
+    public HttpGremlinResponseStreamDecoder(MessageChunkSerializer<?> serializer) {
+
+        this.serializer = serializer;
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, DefaultHttpObject msg, List<Object> out) throws Exception {
+        final Attribute<Boolean> isFirstChunk = ((AttributeMap) ctx).attr(IS_FIRST_CHUNK);
+
+        if (msg instanceof DefaultHttpResponse) {
+            isFirstChunk.set(true);
+        }
+
+        if (msg instanceof DefaultHttpContent) {
+            try {
+                if (ctx.attr(IS_FIRST_CHUNK).get()) {
+                    System.out.println("first chunk");
+                } else {
+                    System.out.println("not first chunk");
+                }
+
+                final ResponseMessage chunk = serializer.readChunk(((DefaultHttpContent) msg).content(), isFirstChunk.get());
+
+                if (chunk.getResult().getData() != null) {
+                    System.out.println("payload size: " + ((List) chunk.getResult().getData()).size());
+                }
+
+                if (msg instanceof DefaultLastHttpContent) {
+                    final HttpHeaders trailingHeaders = ((DefaultLastHttpContent) msg).trailingHeaders();
+
+                    System.out.println("final chunk, trailing headers:");
+                    System.out.println(trailingHeaders);
+
+                    if (!Objects.equals(trailingHeaders.get("code"), "200")) {
+                        throw new Exception(trailingHeaders.get("message"));
+                    }
+                }
+
+                isFirstChunk.set(false);
+
+                out.add(chunk);
+            } catch (SerializationException e) {
+                System.out.println("Ex: " + e.getMessage());
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
index fd97fa3d6e..8a37dfacb5 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
@@ -20,17 +20,13 @@ package org.apache.tinkerpop.gremlin.driver.simple;
 
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.ChannelOption;
-import io.netty.handler.codec.http.EmptyHttpHeaders;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import org.apache.tinkerpop.gremlin.driver.HandshakeInterceptor;
+import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseStreamDecoder;
 import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder;
-import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDecoder;
-import org.apache.tinkerpop.gremlin.util.MessageSerializer;
-import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
-import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
-import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder;
+import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDebugStreamDecoder;
 import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
@@ -39,11 +35,9 @@ import io.netty.channel.ChannelPipeline;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.http.HttpClientCodec;
-import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
-import io.netty.handler.codec.http.websocketx.WebSocketVersion;
 import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
 import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryMapper;
+import org.apache.tinkerpop.gremlin.util.ser.MessageChunkSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,7 +92,7 @@ public class SimpleHttpClient extends AbstractClient {
 
 //            final WebSocketClientHandler wsHandler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(
 //                    uri, WebSocketVersion.V13, null, true, EmptyHttpHeaders.INSTANCE, 65536), 10000, false);
-            final MessageSerializer<GraphBinaryMapper> serializer = new GraphBinaryMessageSerializerV4();
+            final MessageChunkSerializer<GraphBinaryMapper> serializer = new GraphBinaryMessageSerializerV4();
             b.channel(NioSocketChannel.class)
                     .handler(new ChannelInitializer<SocketChannel>() {
                         @Override
@@ -109,12 +103,16 @@ public class SimpleHttpClient extends AbstractClient {
                             }
                             p.addLast(
                                     new HttpClientCodec(),
-                                    new HttpObjectAggregator(65536),
+                                    new HttpGremlinResponseStreamDecoder(serializer),
+//                                    new HttpObjectAggregator(65536),
+
 //                                    wsHandler,
 //                                    new WebSocketGremlinRequestEncoder(true, serializer),
 //                                    new WebSocketGremlinResponseDecoder(serializer),
                                     new HttpGremlinRequestEncoder(serializer, HandshakeInterceptor.NO_OP, false),
-                                    new HttpGremlinResponseDecoder(serializer),
+                                    // new HttpGremlinResponseDecoder(serializer),
+                                    new HttpGremlinResponseDebugStreamDecoder(),
+
                                     callbackResponseHandler);
                         }
                     });
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java
index 2d08b7722f..6c036da7e5 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java
@@ -30,6 +30,8 @@ import org.apache.tinkerpop.gremlin.structure.Transaction;
 import org.apache.tinkerpop.gremlin.util.ser.Serializers;
 import org.junit.Test;
 
+import java.util.List;
+
 import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.AllOf.allOf;
@@ -49,7 +51,7 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes
     public void shouldSubmitScriptWithGraphSON() throws Exception {
         final Cluster cluster = TestClientFactory.build()
                 .channelizer(Channelizer.HttpChannelizer.class)
-                .serializer(Serializers.GRAPHSON_V3)
+                .serializer(Serializers.GRAPHSON_V4)
                 .create();
         try {
             final Client client = cluster.connect();
@@ -65,11 +67,12 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes
     public void shouldSubmitScriptWithGraphBinary() throws Exception {
         final Cluster cluster = TestClientFactory.build()
                 .channelizer(Channelizer.HttpChannelizer.class)
-                .serializer(Serializers.GRAPHBINARY_V1)
+                .serializer(Serializers.GRAPHBINARY_V4)
                 .create();
         try {
             final Client client = cluster.connect();
-            assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+            // default chunk size is 64
+            assertEquals(100, client.submit("new int[100]").all().get().size());
         } catch (Exception ex) {
             throw ex;
         } finally {
@@ -81,7 +84,7 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes
     public void shouldSubmitBytecodeWithGraphSON() throws Exception {
         final Cluster cluster = TestClientFactory.build()
                 .channelizer(Channelizer.HttpChannelizer.class)
-                .serializer(Serializers.GRAPHSON_V3)
+                .serializer(Serializers.GRAPHSON_V4)
                 .create();
         try {
             final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
@@ -98,7 +101,7 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes
     public void shouldGetErrorForBytecodeWithUntypedGraphSON() throws Exception {
         final Cluster cluster = TestClientFactory.build()
                 .channelizer(Channelizer.HttpChannelizer.class)
-                .serializer(Serializers.GRAPHSON_V2_UNTYPED)
+                .serializer(Serializers.GRAPHSON_V4_UNTYPED)
                 .create();
         try {
             final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
@@ -116,7 +119,7 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes
     public void shouldSubmitBytecodeWithGraphBinary() throws Exception {
         final Cluster cluster = TestClientFactory.build()
                 .channelizer(Channelizer.HttpChannelizer.class)
-                .serializer(Serializers.GRAPHBINARY_V1)
+                .serializer(Serializers.GRAPHBINARY_V4)
                 .create();
         try {
             final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
@@ -129,44 +132,6 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes
         }
     }
 
-    @Test
-    public void shouldSubmitMultipleRequestsOverSingleConnection() throws Exception {
-        final Cluster cluster = TestClientFactory.build()
-                .channelizer(Channelizer.HttpChannelizer.class)
-                .minConnectionPoolSize(1).maxConnectionPoolSize(1)
-                .serializer(Serializers.GRAPHBINARY_V1)
-                .create();
-        try {
-            for (int ix = 0; ix < 100; ix++) {
-                final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
-                assertEquals(ix, g.inject(ix).toList().get(0).intValue());
-            }
-        } catch (Exception ex) {
-            throw ex;
-        } finally {
-            cluster.close();
-        }
-    }
-
-    @Test
-    public void shouldSubmitMultipleRequestsOverMultiConnection() throws Exception {
-        final Cluster cluster = TestClientFactory.build()
-                .channelizer(Channelizer.HttpChannelizer.class)
-                .minConnectionPoolSize(1).maxConnectionPoolSize(8)
-                .serializer(Serializers.GRAPHBINARY_V1)
-                .create();
-        try {
-            for (int ix = 0; ix < 100; ix++) {
-                final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
-                assertEquals(ix, g.inject(ix).toList().get(0).intValue());
-            }
-        } catch (Exception ex) {
-            throw ex;
-        } finally {
-            cluster.close();
-        }
-    }
-
     @Test
     public void shouldFailToUseSession() throws Exception {
         final Cluster cluster = TestClientFactory.build()
@@ -205,28 +170,30 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes
         }
     }
 
+    // !!!
     @Test
     public void shouldDeserializeErrorWithGraphBinary() throws Exception {
         final Cluster cluster = TestClientFactory.build()
                 .channelizer(Channelizer.HttpChannelizer.class)
-                .serializer(Serializers.GRAPHBINARY_V1)
+                .serializer(Serializers.GRAPHBINARY_V4)
                 .create();
         try {
             final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster, "doesNotExist"));
             g.V().next();
             fail("Expected exception to be thrown.");
         } catch (Exception ex) {
-            assert ex.getMessage().contains("Could not rebind");
+            assert ex.getMessage().contains("The traversal source [doesNotExist] for alias [g] is not configured on the server.");
         } finally {
             cluster.close();
         }
     }
 
+    // !!!
     @Test
     public void shouldDeserializeErrorWithGraphSON() throws Exception {
         final Cluster cluster = TestClientFactory.build()
                 .channelizer(Channelizer.HttpChannelizer.class)
-                .serializer(Serializers.GRAPHSON_V3)
+                .serializer(Serializers.GRAPHSON_V4)
                 .create();
         try {
             final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster, "doesNotExist"));
diff --git a/gremlin-server/src/test/resources/logback-test.xml b/gremlin-server/src/test/resources/logback-test.xml
index 354e97cb98..647f9674e9 100644
--- a/gremlin-server/src/test/resources/logback-test.xml
+++ b/gremlin-server/src/test/resources/logback-test.xml
@@ -25,7 +25,7 @@ limitations under the License.
     <logger name="org.apache.tinkerpop.gremlin.server.AbstractChannelizer" level="ERROR"/>
     <!-- this logger is noisy and we don't assert anything and the error is already tracked on the server so we can
          trim the logs a bit with this. -->
-    <logger name="org.apache.tinkerpop.gremlin.driver.Handler$GremlinResponseHandler" level="OFF"/>
+    <logger name="org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler" level="OFF"/>
     <root level="WARN">
         <appender-ref ref="stdout"/>
     </root>