You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/30 23:02:14 UTC

git commit: Use a single Netty buffer to read all message bytes and grow as needed.

Repository: qpid-jms
Updated Branches:
  refs/heads/master a2992a757 -> 867c1ba43


Use a single Netty buffer to read all message bytes and grow as needed.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/867c1ba4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/867c1ba4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/867c1ba4

Branch: refs/heads/master
Commit: 867c1ba43446885f90bbc0a09e134976ac28aaee
Parents: a2992a7
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Sep 30 17:02:03 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 30 17:02:03 2014 -0400

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 24 +++++++++++---------
 1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/867c1ba4/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 41fef20..bced2b4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -16,7 +16,9 @@
  */
 package org.apache.qpid.jms.provider.amqp;
 
-import java.io.ByteArrayOutputStream;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -64,12 +66,13 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
     protected static final Symbol JMS_NO_LOCAL_SYMBOL = Symbol.valueOf("no-local");
     protected static final Symbol JMS_SELECTOR_SYMBOL = Symbol.valueOf("jms-selector");
 
+    private static final int INITIAL_BUFFER_CAPACITY = 1024 * 128;
+
     protected final AmqpSession session;
     protected final Map<JmsInboundMessageDispatch, Delivery> delivered = new LinkedHashMap<JmsInboundMessageDispatch, Delivery>();
     protected boolean presettle;
 
-    private final ByteArrayOutputStream streamBuffer = new ByteArrayOutputStream();
-    private final byte incomingBuffer[] = new byte[1024 * 64];
+    private final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY);
 
     private final AtomicLong _incomingSequence = new AtomicLong(0);
 
@@ -418,22 +421,21 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
 
     // TODO - Find more efficient ways to produce the Message instance.
     protected Message decodeIncomingMessage(Delivery incoming) {
-        byte[] buffer;
         int count;
 
-        while ((count = endpoint.recv(incomingBuffer, 0, incomingBuffer.length)) > 0) {
-            streamBuffer.write(incomingBuffer, 0, count);
+        while ((count = endpoint.recv(incomingBuffer.array(), incomingBuffer.writerIndex(), incomingBuffer.writableBytes())) > 0) {
+            incomingBuffer.writerIndex(incomingBuffer.writerIndex() + count);
+            if (!incomingBuffer.isWritable()) {
+                incomingBuffer.capacity((int) (incomingBuffer.capacity() * 1.5));
+            }
         }
 
-        // TODO - This will copy, replace with something better later.  Pooled Netty Buffer ?
-        buffer = streamBuffer.toByteArray();
-
         try {
             Message protonMessage = Message.Factory.create();
-            protonMessage.decode(buffer, 0, buffer.length);
+            protonMessage.decode(incomingBuffer.array(), 0, incomingBuffer.readableBytes());
             return protonMessage;
         } finally {
-            streamBuffer.reset();
+            incomingBuffer.clear();
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org