You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/30 23:02:14 UTC
git commit: Use a single Netty buffer to read all message bytes and
grow as needed.
Repository: qpid-jms
Updated Branches:
refs/heads/master a2992a757 -> 867c1ba43
Use a single Netty buffer to read all message bytes and grow as needed.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/867c1ba4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/867c1ba4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/867c1ba4
Branch: refs/heads/master
Commit: 867c1ba43446885f90bbc0a09e134976ac28aaee
Parents: a2992a7
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Sep 30 17:02:03 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 30 17:02:03 2014 -0400
----------------------------------------------------------------------
.../qpid/jms/provider/amqp/AmqpConsumer.java | 24 +++++++++++---------
1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/867c1ba4/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 41fef20..bced2b4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -16,7 +16,9 @@
*/
package org.apache.qpid.jms.provider.amqp;
-import java.io.ByteArrayOutputStream;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -64,12 +66,13 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
protected static final Symbol JMS_NO_LOCAL_SYMBOL = Symbol.valueOf("no-local");
protected static final Symbol JMS_SELECTOR_SYMBOL = Symbol.valueOf("jms-selector");
+ private static final int INITIAL_BUFFER_CAPACITY = 1024 * 128;
+
protected final AmqpSession session;
protected final Map<JmsInboundMessageDispatch, Delivery> delivered = new LinkedHashMap<JmsInboundMessageDispatch, Delivery>();
protected boolean presettle;
- private final ByteArrayOutputStream streamBuffer = new ByteArrayOutputStream();
- private final byte incomingBuffer[] = new byte[1024 * 64];
+ private final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY);
private final AtomicLong _incomingSequence = new AtomicLong(0);
@@ -418,22 +421,21 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
// TODO - Find more efficient ways to produce the Message instance.
protected Message decodeIncomingMessage(Delivery incoming) {
- byte[] buffer;
int count;
- while ((count = endpoint.recv(incomingBuffer, 0, incomingBuffer.length)) > 0) {
- streamBuffer.write(incomingBuffer, 0, count);
+ while ((count = endpoint.recv(incomingBuffer.array(), incomingBuffer.writerIndex(), incomingBuffer.writableBytes())) > 0) {
+ incomingBuffer.writerIndex(incomingBuffer.writerIndex() + count);
+ if (!incomingBuffer.isWritable()) {
+ incomingBuffer.capacity((int) (incomingBuffer.capacity() * 1.5));
+ }
}
- // TODO - This will copy, replace with something better later. Pooled Netty Buffer ?
- buffer = streamBuffer.toByteArray();
-
try {
Message protonMessage = Message.Factory.create();
- protonMessage.decode(buffer, 0, buffer.length);
+ protonMessage.decode(incomingBuffer.array(), 0, incomingBuffer.readableBytes());
return protonMessage;
} finally {
- streamBuffer.reset();
+ incomingBuffer.clear();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org