You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/06/09 21:58:47 UTC
qpid-broker-j git commit: QPID-8202: [Broker-J][AMQP 0-9] Make sure
that message content is loaded from disk only once before sending it to the
client
Repository: qpid-broker-j
Updated Branches:
refs/heads/6.1.x 33a653380 -> a53709ba6
QPID-8202: [Broker-J][AMQP 0-9] Make sure that message content is loaded from disk only once before sending it to the client
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a53709ba
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a53709ba
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a53709ba
Branch: refs/heads/6.1.x
Commit: a53709ba6d4373b037dbd54e01b3dba28fef6d4c
Parents: 33a6533
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Jun 5 23:38:32 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Sat Jun 9 22:57:56 2018 +0100
----------------------------------------------------------------------
.../v0_8/ProtocolOutputConverterImpl.java | 173 +++++++++++--------
1 file changed, 101 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a53709ba/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
index a66318a..85ae75a 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
@@ -216,27 +216,40 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
else
{
- int maxBodySize = (int) _connection.getMaxFrameSize() - AMQFrame.getFrameOverhead();
-
-
- int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-
- int writtenSize = capacity;
-
- AMQBody firstContentBody = new MessageContentSourceBody(content, 0, capacity);
-
- CompositeAMQBodyBlock
- compositeBlock =
- new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
- writeFrame(compositeBlock);
-
- while (writtenSize < bodySize)
+ int maxFrameBodySize = (int) _connection.getMaxFrameSize() - AMQFrame.getFrameOverhead();
+ Collection<QpidByteBuffer> contentByteBuffers = content.getContent(0, bodySize);
+ try
{
- capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
- AMQBody body = new MessageContentSourceBody(content, writtenSize, capacity);
- writtenSize += capacity;
+ int contentChunkSize = bodySize > maxFrameBodySize ? maxFrameBodySize : bodySize;
+ Collection<QpidByteBuffer> chunk = getContent(contentByteBuffers, 0, contentChunkSize);
+ try
+ {
+ writeFrame(new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, new MessageContentSourceBody(chunk)));
+ }
+ finally
+ {
+ dispose(chunk);
+ }
- writeFrame(new AMQFrame(channelId, body));
+ int writtenSize = contentChunkSize;
+ while (writtenSize < bodySize)
+ {
+ contentChunkSize = bodySize - writtenSize > maxFrameBodySize ? maxFrameBodySize : bodySize - writtenSize;
+ chunk = getContent(contentByteBuffers, writtenSize, contentChunkSize);
+ try
+ {
+ writeFrame(new AMQFrame(channelId, new MessageContentSourceBody(chunk)));
+ writtenSize += contentChunkSize;
+ }
+ finally
+ {
+ dispose(chunk);
+ }
+ }
+ }
+ finally
+ {
+ dispose(contentByteBuffers);
}
}
}
@@ -246,18 +259,76 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return GZIP_ENCODING.equals(contentHeaderBody.getProperties().getEncoding());
}
+ private static void dispose(final Collection<QpidByteBuffer> contentByteBuffers)
+ {
+ for (QpidByteBuffer qbb: contentByteBuffers)
+ {
+ qbb.dispose();
+ }
+ }
+
+ private static Collection<QpidByteBuffer> getContent(final Collection<QpidByteBuffer> messageContent,
+ final int offset,
+ int length)
+ {
+ Collection<QpidByteBuffer> content = new ArrayList<>(messageContent.size());
+ int pos = 0;
+ for (QpidByteBuffer buf : messageContent)
+ {
+ if (length > 0)
+ {
+ int bufRemaining = buf.remaining();
+ if (pos + bufRemaining <= offset)
+ {
+ pos += bufRemaining;
+ }
+ else if (pos >= offset)
+ {
+ buf = buf.duplicate();
+ if (bufRemaining <= length)
+ {
+ length -= bufRemaining;
+ }
+ else
+ {
+ buf.limit(length);
+ length = 0;
+ }
+ content.add(buf);
+ pos+=buf.remaining();
+
+ }
+ else
+ {
+ int offsetInBuf = offset - pos;
+ int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf;
+ final QpidByteBuffer bufView = buf.view(offsetInBuf, limit);
+ content.add(bufView);
+ length -= limit;
+ pos+=limit+offsetInBuf;
+ }
+ }
+
+ }
+ return content;
+
+ }
+
private class MessageContentSourceBody implements AMQBody
{
public static final byte TYPE = 3;
private final int _length;
- private final MessageContentSource _content;
- private final int _offset;
+ private final Collection<QpidByteBuffer> _content;
- public MessageContentSourceBody(MessageContentSource content, int offset, int length)
+ MessageContentSourceBody(Collection<QpidByteBuffer> content)
{
_content = content;
- _offset = offset;
- _length = length;
+ int size = 0;
+ for (QpidByteBuffer qbb: content)
+ {
+ size += qbb.remaining();
+ }
+ _length = size;
}
public byte getFrameType()
@@ -273,13 +344,11 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@Override
public long writePayload(final ByteBufferSender sender)
{
- long size = 0L;
- for(QpidByteBuffer buf : _content.getContent(_offset, _length))
+ long size = 0;
+ for (QpidByteBuffer qbb: _content)
{
- size += buf.remaining();
-
- sender.send(buf);
- buf.dispose();
+ size += qbb.remaining();
+ sender.send(qbb);
}
return size;
}
@@ -292,7 +361,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@Override
public String toString()
{
- return "[" + getClass().getSimpleName() + " offset: " + _offset + ", length: " + _length + "]";
+ return "[" + getClass().getSimpleName() + " length: " + _length + "]";
}
}
@@ -561,47 +630,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@Override
public Collection<QpidByteBuffer> getContent(final int offset, int length)
{
- Collection<QpidByteBuffer> content = new ArrayList<>(_buffers.size());
- int pos = 0;
- for (QpidByteBuffer buf : _buffers)
- {
- if (length > 0)
- {
- int bufRemaining = buf.remaining();
- if (pos + bufRemaining <= offset)
- {
- pos += bufRemaining;
- }
- else if (pos >= offset)
- {
- buf = buf.duplicate();
- if (bufRemaining <= length)
- {
- length -= bufRemaining;
- }
- else
- {
- buf.limit(length);
- length = 0;
- }
- content.add(buf);
- pos+=buf.remaining();
-
- }
- else
- {
- int offsetInBuf = offset - pos;
- int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf;
- final QpidByteBuffer bufView = buf.view(offsetInBuf, limit);
- content.add(bufView);
- length -= limit;
- pos+=limit+offsetInBuf;
- }
- }
-
- }
- return content;
-
+ return ProtocolOutputConverterImpl.getContent(_buffers, offset, length);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org