You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/05/07 21:30:33 UTC

[1/2] incubator-tinkerpop git commit: Small optimization in gremlin-driver to prevent recreation of ResponseMessage instances.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 688850997 -> 97f4ed2df


Small optimization in gremlin-driver to prevent recreation of ResponseMessage instances.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/c86826e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/c86826e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/c86826e0

Branch: refs/heads/master
Commit: c86826e0cbd5343511160b46889764b3db94a477
Parents: 6888509
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Thu May 7 14:48:49 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu May 7 14:48:49 2015 -0400

----------------------------------------------------------------------
 .../tinkerpop/gremlin/driver/Handler.java       | 23 +++++++++++++++-----
 .../tinkerpop/gremlin/driver/ResponseQueue.java |  2 +-
 2 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c86826e0/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index 88a1601..ebfdf11 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -30,12 +30,16 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 
 /**
- * Traverser for internal handler classes for constructing the Channel Pipeline.
+ * Holder for internal handler classes used in constructing the channel pipeline.
  *
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
 class Handler {
 
+    /**
+     * Takes a map of requests pending responses and writes responses to the {@link ResponseQueue} of a request
+     * as the {@link ResponseMessage} objects are deserialized.
+     */
     static class GremlinResponseHandler extends SimpleChannelInboundHandler<ResponseMessage> {
         private final ConcurrentMap<UUID, ResponseQueue> pending;
 
@@ -50,18 +54,25 @@ class Handler {
                         response.getStatus().getCode() == ResponseStatusCode.PARTIAL_CONTENT) {
                     final Object data = response.getResult().getData();
                     if (data instanceof List) {
-                        // unrolls the collection into individual response messages to be handled by the queue
+                        // unrolls the collection into individual response messages to be handled by the queue. of
+                        // course, this assumes that the list is of size greater than 1 - else it can be treated as
+                        // a normal object as there is no need to create new ResponseMessage instances for it
                         final List<Object> listToUnroll = (List<Object>) data;
                         final ResponseQueue queue = pending.get(response.getRequestId());
-                        listToUnroll.forEach(item -> queue.add(
-                                ResponseMessage.build(response.getRequestId())
-                                        .result(item).create()));
+                        if (listToUnroll.size() == 1) {
+                            queue.add(response);
+                        } else {
+                            listToUnroll.forEach(item -> queue.add(
+                                    ResponseMessage.build(response.getRequestId())
+                                            .result(item).create()));
+                        }
                     } else {
                         // since this is not a list it can just be added to the queue
                         pending.get(response.getRequestId()).add(response);
                     }
-                } else
+                } else {
                     pending.get(response.getRequestId()).markError(new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage()));
+                }
 
                 // todo: should this go in finally? where is the catch?
                 // as this is a non-PARTIAL_CONTENT code - the stream is done

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c86826e0/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResponseQueue.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResponseQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResponseQueue.java
index b3cfbfe..40055b5 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResponseQueue.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResponseQueue.java
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A queue of incoming {@link ResponseMessage} objects.  The queue is updated by the
- * {@link org.apache.tinkerpop.gremlin.driver.Handler.GremlinResponseHandler} until a response terminator is identified.  At that point the fetch
+ * {@link Handler.GremlinResponseHandler} until a response terminator is identified.  At that point the fetch
  * status is changed to {@link Status#COMPLETE} and all results have made it client side.
  *
  * @author Stephen Mallette (http://stephen.genoprime.com)


[2/2] incubator-tinkerpop git commit: Stop using ResponseMessage as the commodity of result queues.

Posted by sp...@apache.org.
Stop using ResponseMessage as the commodity of result queues.

Instead of flattening List data in ResponseMessages into more ResponseMessages, just convert direction to Result from the GremlinResponseHandler.  No part of the ResponseMessage is used downstream of that point.  Eliminates useless object creation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/97f4ed2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/97f4ed2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/97f4ed2d

Branch: refs/heads/master
Commit: 97f4ed2df960b05033bd7257c617281964da65aa
Parents: c86826e
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Thu May 7 15:28:30 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu May 7 15:28:30 2015 -0400

----------------------------------------------------------------------
 .../tinkerpop/gremlin/driver/Channelizer.java   |  2 +-
 .../tinkerpop/gremlin/driver/Connection.java    |  9 +-
 .../tinkerpop/gremlin/driver/Handler.java       | 22 ++---
 .../tinkerpop/gremlin/driver/ResponseQueue.java | 97 --------------------
 .../apache/tinkerpop/gremlin/driver/Result.java |  9 +-
 .../tinkerpop/gremlin/driver/ResultQueue.java   | 97 ++++++++++++++++++++
 .../tinkerpop/gremlin/driver/ResultSet.java     | 36 ++++----
 7 files changed, 132 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/97f4ed2d/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index a0a272b..d7bba20 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -64,7 +64,7 @@ public interface Channelizer extends ChannelHandler {
     abstract class AbstractChannelizer extends ChannelInitializer<SocketChannel> implements Channelizer {
         protected Connection connection;
         protected Cluster cluster;
-        private ConcurrentMap<UUID, ResponseQueue> pending;
+        private ConcurrentMap<UUID, ResultQueue> pending;
 
         protected static final String PIPELINE_GREMLIN_HANDLER = "gremlin-handler";
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/97f4ed2d/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 14a657d..6c4b4a4 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -20,7 +20,6 @@ package org.apache.tinkerpop.gremlin.driver;
 
 import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
-import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelPromise;
@@ -48,7 +47,7 @@ class Connection {
 
     private final Channel channel;
     private final URI uri;
-    private final ConcurrentMap<UUID, ResponseQueue> pending = new ConcurrentHashMap<>();
+    private final ConcurrentMap<UUID, ResultQueue> pending = new ConcurrentHashMap<>();
     private final Cluster cluster;
     private final ConnectionPool pool;
 
@@ -123,7 +122,7 @@ class Connection {
         return cluster;
     }
 
-    ConcurrentMap<UUID, ResponseQueue> getPending() {
+    ConcurrentMap<UUID, ResultQueue> getPending() {
         return pending;
     }
 
@@ -169,14 +168,14 @@ class Connection {
                         thisConnection.returnToPool();
                         future.completeExceptionally(f.cause());
                     } else {
-                        final LinkedBlockingQueue<ResponseMessage> responseQueue = new LinkedBlockingQueue<>();
+                        final LinkedBlockingQueue<Result> resultLinkedBlockingQueue = new LinkedBlockingQueue<>();
                         final CompletableFuture<Void> readCompleted = new CompletableFuture<>();
                         readCompleted.thenAcceptAsync(v -> {
                             thisConnection.returnToPool();
                             if (isClosed() && pending.isEmpty())
                                 shutdown(closeFuture.get());
                         });
-                        final ResponseQueue handler = new ResponseQueue(responseQueue, readCompleted);
+                        final ResultQueue handler = new ResultQueue(resultLinkedBlockingQueue, readCompleted);
                         pending.put(requestMessage.getRequestId(), handler);
                         final ResultSet resultSet = new ResultSet(handler, cluster.executor(), channel,
                                 () -> {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/97f4ed2d/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index ebfdf11..5bc5b38 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -37,13 +37,13 @@ import java.util.concurrent.ConcurrentMap;
 class Handler {
 
     /**
-     * Takes a map of requests pending responses and writes responses to the {@link ResponseQueue} of a request
+     * Takes a map of requests pending responses and writes responses to the {@link ResultQueue} of a request
      * as the {@link ResponseMessage} objects are deserialized.
      */
     static class GremlinResponseHandler extends SimpleChannelInboundHandler<ResponseMessage> {
-        private final ConcurrentMap<UUID, ResponseQueue> pending;
+        private final ConcurrentMap<UUID, ResultQueue> pending;
 
-        public GremlinResponseHandler(final ConcurrentMap<UUID, ResponseQueue> pending) {
+        public GremlinResponseHandler(final ConcurrentMap<UUID, ResultQueue> pending) {
             this.pending = pending;
         }
 
@@ -54,21 +54,13 @@ class Handler {
                         response.getStatus().getCode() == ResponseStatusCode.PARTIAL_CONTENT) {
                     final Object data = response.getResult().getData();
                     if (data instanceof List) {
-                        // unrolls the collection into individual response messages to be handled by the queue. of
-                        // course, this assumes that the list is of size greater than 1 - else it can be treated as
-                        // a normal object as there is no need to create new ResponseMessage instances for it
+                        // unrolls the collection into individual results to be handled by the queue.
                         final List<Object> listToUnroll = (List<Object>) data;
-                        final ResponseQueue queue = pending.get(response.getRequestId());
-                        if (listToUnroll.size() == 1) {
-                            queue.add(response);
-                        } else {
-                            listToUnroll.forEach(item -> queue.add(
-                                    ResponseMessage.build(response.getRequestId())
-                                            .result(item).create()));
-                        }
+                        final ResultQueue queue = pending.get(response.getRequestId());
+                        listToUnroll.forEach(item -> queue.add(new Result(item)));
                     } else {
                         // since this is not a list it can just be added to the queue
-                        pending.get(response.getRequestId()).add(response);
+                        pending.get(response.getRequestId()).add(new Result(response.getResult().getData()));
                     }
                 } else {
                     pending.get(response.getRequestId()).markError(new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage()));

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/97f4ed2d/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResponseQueue.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResponseQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResponseQueue.java
deleted file mode 100644
index 40055b5..0000000
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResponseQueue.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.driver;
-
-import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * A queue of incoming {@link ResponseMessage} objects.  The queue is updated by the
- * {@link Handler.GremlinResponseHandler} until a response terminator is identified.  At that point the fetch
- * status is changed to {@link Status#COMPLETE} and all results have made it client side.
- *
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-class ResponseQueue {
-    public enum Status {
-        FETCHING,
-        COMPLETE
-    }
-
-    private final LinkedBlockingQueue<ResponseMessage> responseQueue;
-
-    private volatile Status status = Status.FETCHING;
-
-    private final AtomicReference<Throwable> error = new AtomicReference<>();
-
-    private final CompletableFuture<Void> readComplete;
-
-    public ResponseQueue(final LinkedBlockingQueue<ResponseMessage> responseQueue, final CompletableFuture<Void> readComplete) {
-        this.responseQueue = responseQueue;
-        this.readComplete = readComplete;
-    }
-
-    public void add(final ResponseMessage msg) {
-        this.responseQueue.offer(msg);
-    }
-
-    public int size() {
-        if (error.get() != null) throw new RuntimeException(error.get());
-        return this.responseQueue.size();
-    }
-
-    public boolean isEmpty() {
-        if (error.get() != null) throw new RuntimeException(error.get());
-        return this.size() == 0;
-    }
-
-    public ResponseMessage poll() {
-        ResponseMessage msg = null;
-        do {
-            if (error.get() != null) throw new RuntimeException(error.get());
-            try {
-                msg = responseQueue.poll(10, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException ie) {
-                error.set(new RuntimeException(ie));
-            }
-        } while (null == msg && status == Status.FETCHING);
-
-        if (error.get() != null) throw new RuntimeException(error.get());
-
-        return msg;
-    }
-
-    public Status getStatus() {
-        return status;
-    }
-
-    void markComplete() {
-        this.status = Status.COMPLETE;
-        this.readComplete.complete(null);
-    }
-
-    void markError(final Throwable throwable) {
-        error.set(throwable);
-        this.readComplete.complete(null);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/97f4ed2d/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Result.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Result.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Result.java
index cffafd8..bb3bf44 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Result.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Result.java
@@ -18,7 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
-import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseResult;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
@@ -31,8 +31,11 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
 public class Result {
     final Object resultObject;
 
-    public Result(final ResponseMessage response) {
-        this.resultObject = response.getResult().getData();
+    /**
+     * Constructs a "result" from data found in {@link ResponseResult#getData()}.
+     */
+    public Result(final Object responseData) {
+        this.resultObject = responseData;
     }
 
     public String getString() {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/97f4ed2d/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
new file mode 100644
index 0000000..35eaffe
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A queue of incoming {@link ResponseMessage} objects.  The queue is updated by the
+ * {@link Handler.GremlinResponseHandler} until a response terminator is identified.  At that point the fetch
+ * status is changed to {@link Status#COMPLETE} and all results have made it client side.
+ *
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+class ResultQueue {
+    public enum Status {
+        FETCHING,
+        COMPLETE
+    }
+
+    private final LinkedBlockingQueue<Result> resultLinkedBlockingQueue;
+
+    private volatile Status status = Status.FETCHING;
+
+    private final AtomicReference<Throwable> error = new AtomicReference<>();
+
+    private final CompletableFuture<Void> readComplete;
+
+    public ResultQueue(final LinkedBlockingQueue<Result> resultLinkedBlockingQueue, final CompletableFuture<Void> readComplete) {
+        this.resultLinkedBlockingQueue = resultLinkedBlockingQueue;
+        this.readComplete = readComplete;
+    }
+
+    public void add(final Result result) {
+        this.resultLinkedBlockingQueue.offer(result);
+    }
+
+    public int size() {
+        if (error.get() != null) throw new RuntimeException(error.get());
+        return this.resultLinkedBlockingQueue.size();
+    }
+
+    public boolean isEmpty() {
+        if (error.get() != null) throw new RuntimeException(error.get());
+        return this.size() == 0;
+    }
+
+    public Result poll() {
+        Result result = null;
+        do {
+            if (error.get() != null) throw new RuntimeException(error.get());
+            try {
+                result = resultLinkedBlockingQueue.poll(10, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException ie) {
+                error.set(new RuntimeException(ie));
+            }
+        } while (null == result && status == Status.FETCHING);
+
+        if (error.get() != null) throw new RuntimeException(error.get());
+
+        return result;
+    }
+
+    public Status getStatus() {
+        return status;
+    }
+
+    void markComplete() {
+        this.status = Status.COMPLETE;
+        this.readComplete.complete(null);
+    }
+
+    void markError(final Throwable throwable) {
+        error.set(throwable);
+        this.readComplete.complete(null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/97f4ed2d/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
index a645afa..889864f 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
@@ -18,8 +18,6 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
-import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
-import org.apache.tinkerpop.gremlin.util.StreamFactory;
 import io.netty.channel.Channel;
 
 import java.util.ArrayList;
@@ -43,15 +41,15 @@ import java.util.stream.StreamSupport;
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
 public class ResultSet implements Iterable<Result> {
-    private final ResponseQueue responseQueue;
+    private final ResultQueue resultQueue;
     private final ExecutorService executor;
     private final Channel channel;
     private final Supplier<Void> onChannelError;
 
-    public ResultSet(final ResponseQueue responseQueue, final ExecutorService executor,
+    public ResultSet(final ResultQueue resultQueue, final ExecutorService executor,
                      final Channel channel, final Supplier<Void> onChannelError) {
         this.executor = executor;
-        this.responseQueue = responseQueue;
+        this.resultQueue = resultQueue;
         this.channel = channel;
         this.onChannelError = onChannelError;
     }
@@ -60,42 +58,42 @@ public class ResultSet implements Iterable<Result> {
      * Determines if all items have been returned to the client.
      */
     public boolean allItemsAvailable() {
-        return responseQueue.getStatus() == ResponseQueue.Status.COMPLETE;
+        return resultQueue.getStatus() == ResultQueue.Status.COMPLETE;
     }
 
     /**
      * Gets the number of items available on the client.
      */
     public int getAvailableItemCount() {
-        return responseQueue.size();
+        return resultQueue.size();
     }
 
     /**
      * Determines if there are any remaining items being streamed to the client.
      */
     public boolean isExhausted() {
-        if (!responseQueue.isEmpty())
+        if (!resultQueue.isEmpty())
             return false;
 
         internalAwaitItems(1);
 
-        assert !responseQueue.isEmpty() || allItemsAvailable();
-        return responseQueue.isEmpty();
+        assert !resultQueue.isEmpty() || allItemsAvailable();
+        return resultQueue.isEmpty();
     }
 
     /**
      * Get the next {@link Result} from the stream, blocking until one is available.
      */
     public Result one() {
-        ResponseMessage msg = responseQueue.poll();
-        if (msg != null)
-            return new Result(msg);
+        Result result = resultQueue.poll();
+        if (result != null)
+            return result;
 
         internalAwaitItems(1);
 
-        msg = responseQueue.poll();
-        if (msg != null)
-            return new Result(msg);
+        result = resultQueue.poll();
+        if (result != null)
+            return result;
         else
             return null;
     }
@@ -118,9 +116,9 @@ public class ResultSet implements Iterable<Result> {
         return CompletableFuture.supplyAsync(() -> {
             final List<Result> list = new ArrayList<>();
             while (!isExhausted()) {
-                final ResponseMessage msg = responseQueue.poll();
-                if (msg != null)
-                    list.add(new Result(msg));
+                final Result result = resultQueue.poll();
+                if (result != null)
+                    list.add(result);
             }
             return list;
         }, executor);