You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/06/09 21:58:47 UTC

qpid-broker-j git commit: QPID-8202: [Broker-J][AMQP 0-9] Make sure that message content is loaded from disk only once before sending it to the client

Repository: qpid-broker-j
Updated Branches:
  refs/heads/6.1.x 33a653380 -> a53709ba6


QPID-8202: [Broker-J][AMQP 0-9] Make sure that message content is loaded from disk only once before sending it to the client


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a53709ba
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a53709ba
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a53709ba

Branch: refs/heads/6.1.x
Commit: a53709ba6d4373b037dbd54e01b3dba28fef6d4c
Parents: 33a6533
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Jun 5 23:38:32 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Sat Jun 9 22:57:56 2018 +0100

----------------------------------------------------------------------
 .../v0_8/ProtocolOutputConverterImpl.java       | 173 +++++++++++--------
 1 file changed, 101 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a53709ba/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
index a66318a..85ae75a 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
@@ -216,27 +216,40 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
         }
         else
         {
-            int maxBodySize = (int) _connection.getMaxFrameSize() - AMQFrame.getFrameOverhead();
-
-
-            int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-
-            int writtenSize = capacity;
-
-            AMQBody firstContentBody = new MessageContentSourceBody(content, 0, capacity);
-
-            CompositeAMQBodyBlock
-                    compositeBlock =
-                    new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
-            writeFrame(compositeBlock);
-
-            while (writtenSize < bodySize)
+            int maxFrameBodySize = (int) _connection.getMaxFrameSize() - AMQFrame.getFrameOverhead();
+            Collection<QpidByteBuffer> contentByteBuffers = content.getContent(0, bodySize);
+            try
             {
-                capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
-                AMQBody body = new MessageContentSourceBody(content, writtenSize, capacity);
-                writtenSize += capacity;
+                int contentChunkSize = bodySize > maxFrameBodySize ? maxFrameBodySize : bodySize;
+                Collection<QpidByteBuffer> chunk = getContent(contentByteBuffers, 0, contentChunkSize);
+                try
+                {
+                    writeFrame(new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, new MessageContentSourceBody(chunk)));
+                }
+                finally
+                {
+                    dispose(chunk);
+                }
 
-                writeFrame(new AMQFrame(channelId, body));
+                int writtenSize = contentChunkSize;
+                while (writtenSize < bodySize)
+                {
+                    contentChunkSize = bodySize - writtenSize > maxFrameBodySize ? maxFrameBodySize : bodySize - writtenSize;
+                    chunk = getContent(contentByteBuffers, writtenSize, contentChunkSize);
+                    try
+                    {
+                        writeFrame(new AMQFrame(channelId, new MessageContentSourceBody(chunk)));
+                        writtenSize += contentChunkSize;
+                    }
+                    finally
+                    {
+                        dispose(chunk);
+                    }
+                }
+            }
+            finally
+            {
+                dispose(contentByteBuffers);
             }
         }
     }
@@ -246,18 +259,76 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
         return GZIP_ENCODING.equals(contentHeaderBody.getProperties().getEncoding());
     }
 
+    private static void dispose(final Collection<QpidByteBuffer> contentByteBuffers)
+    {
+        for (QpidByteBuffer qbb: contentByteBuffers)
+        {
+            qbb.dispose();
+        }
+    }
+
+    private static Collection<QpidByteBuffer> getContent(final Collection<QpidByteBuffer> messageContent,
+                                                         final int offset,
+                                                         int length)
+    {
+        Collection<QpidByteBuffer> content = new ArrayList<>(messageContent.size());
+        int pos = 0;
+        for (QpidByteBuffer buf : messageContent)
+        {
+            if (length > 0)
+            {
+                int bufRemaining = buf.remaining();
+                if (pos + bufRemaining <= offset)
+                {
+                    pos += bufRemaining;
+                }
+                else if (pos >= offset)
+                {
+                    buf = buf.duplicate();
+                    if (bufRemaining <= length)
+                    {
+                        length -= bufRemaining;
+                    }
+                    else
+                    {
+                        buf.limit(length);
+                        length = 0;
+                    }
+                    content.add(buf);
+                    pos+=buf.remaining();
+
+                }
+                else
+                {
+                    int offsetInBuf = offset - pos;
+                    int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf;
+                    final QpidByteBuffer bufView = buf.view(offsetInBuf, limit);
+                    content.add(bufView);
+                    length -= limit;
+                    pos+=limit+offsetInBuf;
+                }
+            }
+
+        }
+        return content;
+
+    }
+
     private class MessageContentSourceBody implements AMQBody
     {
         public static final byte TYPE = 3;
         private final int _length;
-        private final MessageContentSource _content;
-        private final int _offset;
+        private final Collection<QpidByteBuffer> _content;
 
-        public MessageContentSourceBody(MessageContentSource content, int offset, int length)
+        MessageContentSourceBody(Collection<QpidByteBuffer> content)
         {
             _content = content;
-            _offset = offset;
-            _length = length;
+             int size = 0;
+            for (QpidByteBuffer qbb: content)
+            {
+                size += qbb.remaining();
+            }
+            _length = size;
         }
 
         public byte getFrameType()
@@ -273,13 +344,11 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
         @Override
         public long writePayload(final ByteBufferSender sender)
         {
-            long size = 0L;
-            for(QpidByteBuffer buf : _content.getContent(_offset, _length))
+            long size = 0;
+            for (QpidByteBuffer qbb: _content)
             {
-                size += buf.remaining();
-
-                sender.send(buf);
-                buf.dispose();
+                size += qbb.remaining();
+                sender.send(qbb);
             }
             return size;
         }
@@ -292,7 +361,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
         @Override
         public String toString()
         {
-            return "[" + getClass().getSimpleName() + " offset: " + _offset + ", length: " + _length + "]";
+            return "[" + getClass().getSimpleName() + " length: " + _length + "]";
         }
 
     }
@@ -561,47 +630,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
         @Override
         public Collection<QpidByteBuffer> getContent(final int offset, int length)
         {
-            Collection<QpidByteBuffer> content = new ArrayList<>(_buffers.size());
-            int pos = 0;
-            for (QpidByteBuffer buf : _buffers)
-            {
-                if (length > 0)
-                {
-                    int bufRemaining = buf.remaining();
-                    if (pos + bufRemaining <= offset)
-                    {
-                        pos += bufRemaining;
-                    }
-                    else if (pos >= offset)
-                    {
-                        buf = buf.duplicate();
-                        if (bufRemaining <= length)
-                        {
-                            length -= bufRemaining;
-                        }
-                        else
-                        {
-                            buf.limit(length);
-                            length = 0;
-                        }
-                        content.add(buf);
-                        pos+=buf.remaining();
-
-                    }
-                    else
-                    {
-                        int offsetInBuf = offset - pos;
-                        int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf;
-                        final QpidByteBuffer bufView = buf.view(offsetInBuf, limit);
-                        content.add(bufView);
-                        length -= limit;
-                        pos+=limit+offsetInBuf;
-                    }
-                }
-
-            }
-            return content;
-
+            return ProtocolOutputConverterImpl.getContent(_buffers, offset, length);
         }
 
         @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org