You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2015/10/14 10:53:42 UTC

svn commit: r1708561 - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/message/internal/ broker-core/src/mai...

Author: lquack
Date: Wed Oct 14 08:53:42 2015
New Revision: 1708561

URL: http://svn.apache.org/viewvc?rev=1708561&view=rev
Log:
QPID-6735: [Java Broker] Refactor how persisted messages are loaded from disk.

Messages that are loaded from disk are immediately reflown to disk potentially releasing the underlying QpidByteBuffer.
Add extra code to avoid reloading of message content on creation of chunk for delivery by doing the chunking on a higher level.
Removed unused offset parameter from MessageContentSource#getContent(ByteBuffer dst, int offset).

Modified:
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
    qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
    qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
    qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
    qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
    qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
    qpid/java/trunk/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
    qpid/java/trunk/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Wed Oct 14 08:53:42 2015
@@ -1040,7 +1040,7 @@ public abstract class AbstractBDBMessage
                 {
                     checkMessageStoreOpen();
                     metaData = (T) getMessageMetaData(_messageId);
-                    _messageDataRef = new MessageDataSoftRef<>(metaData, null);
+                    _messageDataRef = new MessageDataSoftRef<>(metaData, _messageDataRef.getData());
                 }
                 return metaData;
             }
@@ -1078,8 +1078,9 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public synchronized int getContent(int offsetInMessage, final ByteBuffer dst)
+        public synchronized int getContent(final ByteBuffer dst)
         {
+            // These do not need to be disposed of because getContentAsByteBuffer() retains a reference
             Collection<QpidByteBuffer> allContent = getContentAsByteBuffer();
             int length = 0;
             for(QpidByteBuffer contentChunk : allContent)
@@ -1090,6 +1091,9 @@ public abstract class AbstractBDBMessage
             return length;
         }
 
+        /**
+         * returns QBBs containing the content. The caller must not dispose of them because we keep a reference in _messageDataRef.
+         */
         private Collection<QpidByteBuffer> getContentAsByteBuffer()
         {
             Collection<QpidByteBuffer> data = _messageDataRef == null ? Collections.<QpidByteBuffer>emptyList() : _messageDataRef.getData();
@@ -1099,16 +1103,7 @@ public abstract class AbstractBDBMessage
                 {
                     checkMessageStoreOpen();
                     data = AbstractBDBMessageStore.this.getAllContent(_messageId);
-                    T metaData = _messageDataRef.getMetaData();
-                    if (metaData == null)
-                    {
-                        metaData = (T) getMessageMetaData(_messageId);
-                        _messageDataRef = new MessageDataSoftRef<T>(metaData, data);
-                    }
-                    else
-                    {
-                        _messageDataRef.setData(data);
-                    }
+                    _messageDataRef.setData(data);
                 }
                 else
                 {
@@ -1119,44 +1114,14 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public synchronized Collection<QpidByteBuffer> getContent(final int offsetInMessage, final int size)
+        public synchronized Collection<QpidByteBuffer> getContent()
         {
-            int pos = 0;
-            int added = 0;
-
             Collection<QpidByteBuffer> bufs = getContentAsByteBuffer();
-            List<QpidByteBuffer> content = new ArrayList<>(bufs.size());
-            for(QpidByteBuffer buf : bufs)
+            Collection<QpidByteBuffer> content = new ArrayList<>(bufs.size());
+            for (QpidByteBuffer buf : bufs)
             {
-                if(pos < offsetInMessage)
-                {
-                    final int remaining = buf.remaining();
-                    if(pos+ remaining >=offsetInMessage)
-                    {
-                        buf = buf.view(offsetInMessage-pos,size);
-
-                        content.add(buf);
-                        added += buf.remaining();
-                    }
-                    pos+= remaining;
-
-                }
-                else
-                {
-                    buf = buf.slice();
-                    if(buf.remaining() > (size-added))
-                    {
-                        buf.limit(size-added);
-                    }
-                    content.add(buf);
-                    added += buf.remaining();
-                }
-                if(added >= size)
-                {
-                    break;
-                }
+                content.add(buf.duplicate());
             }
-
             return content;
         }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java Wed Oct 14 08:53:42 2015
@@ -169,15 +169,39 @@ public abstract class AbstractServerMess
     }
 
     @Override
-    final public int getContent(ByteBuffer buf, int offset)
+    final public int getContent(ByteBuffer buf)
     {
-        return getStoredMessage().getContent(offset, buf);
+        StoredMessage<T> storedMessage = getStoredMessage();
+        boolean wasInMemory = storedMessage.isInMemory();
+        try
+        {
+            return storedMessage.getContent(buf);
+        }
+        finally
+        {
+            if (!wasInMemory)
+            {
+                storedMessage.flowToDisk();
+            }
+        }
     }
 
     @Override
-    final public Collection<QpidByteBuffer> getContent(int offset, int size)
+    final public Collection<QpidByteBuffer> getContent()
     {
-        return getStoredMessage().getContent(offset, size);
+        StoredMessage<T> storedMessage = getStoredMessage();
+        boolean wasInMemory = storedMessage.isInMemory();
+        try
+        {
+            return storedMessage.getContent();
+        }
+        finally
+        {
+            if (!wasInMemory)
+            {
+                storedMessage.flowToDisk();
+            }
+        }
     }
 
     final public Object getConnectionReference()

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java Wed Oct 14 08:53:42 2015
@@ -28,8 +28,8 @@ import org.apache.qpid.bytebuffer.QpidBy
 
 public interface MessageContentSource
 {
-    int getContent(ByteBuffer buf, int offset);
-    Collection<QpidByteBuffer> getContent(int offset, int size);
+    int getContent(ByteBuffer buf);
+    Collection<QpidByteBuffer> getContent();
 
     long getSize();
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java Wed Oct 14 08:53:42 2015
@@ -65,7 +65,7 @@ public class InternalMessage extends Abs
     {
         super(msg, null);
         _contentSize = msg.getMetaData().getContentSize();
-        Collection<QpidByteBuffer> bufs = msg.getContent(0, _contentSize);
+        Collection<QpidByteBuffer> bufs = msg.getContent();
 
         try(ObjectInputStream is = new ObjectInputStream(new ByteBufferInputStream(ByteBufferUtils.combine(bufs))))
         {
@@ -224,24 +224,21 @@ public class InternalMessage extends Abs
                     }
 
                     @Override
-                    public int getContent(final int offsetInMessage, final ByteBuffer dst)
+                    public int getContent(final ByteBuffer dst)
                     {
-                        ByteBuffer buffer = ByteBuffer.wrap(bytes);
-                        buffer.position(offsetInMessage);
-                        buffer = buffer.slice();
-                        if (dst.remaining() < buffer.remaining())
+                        int size = bytes.length;
+                        if (dst.remaining() < size)
                         {
-                            buffer.limit(dst.remaining());
+                            size = dst.remaining();
                         }
-                        int pos = dst.position();
-                        dst.put(buffer);
-                        return dst.position() - pos;
+                        dst.put(bytes, 0 ,size);
+                        return size;
                     }
 
                     @Override
-                    public Collection<QpidByteBuffer> getContent(final int offsetInMessage, final int size)
+                    public Collection<QpidByteBuffer> getContent()
                     {
-                        return Collections.singleton(QpidByteBuffer.wrap(bytes, offsetInMessage, size));
+                        return Collections.singleton(QpidByteBuffer.wrap(bytes));
                     }
 
                     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Oct 14 08:53:42 2015
@@ -3463,7 +3463,7 @@ public abstract class AbstractQueue<X ex
                             _size = message.getSize();
                             _content = new byte[(int) _size];
                             _found = true;
-                            message.getContent(ByteBuffer.wrap(_content), 0);
+                            message.getContent(ByteBuffer.wrap(_content));
                         }
                         finally
                         {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Wed Oct 14 08:53:42 2015
@@ -1441,7 +1441,7 @@ public abstract class AbstractJDBCMessag
                     try
                     {
                         metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId);
-                        _messageDataRef = new MessageDataSoftRef<>(metaData,null);
+                        _messageDataRef = new MessageDataSoftRef<>(metaData, _messageDataRef.getData());
                     }
                     catch (SQLException e)
                     {
@@ -1484,8 +1484,9 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public synchronized int getContent(int offsetInMessage, final ByteBuffer dst)
+        public synchronized int getContent(final ByteBuffer dst)
         {
+            // These do not need to be disposed of because getContentAsByteBuffer() retains a reference
             Collection<QpidByteBuffer> allContent = getContentAsByteBuffer();
             int length = 0;
             for(QpidByteBuffer contentChunk : allContent)
@@ -1496,6 +1497,9 @@ public abstract class AbstractJDBCMessag
             return length;
         }
 
+        /**
+         * returns QBBs containing the content. The caller must not dispose of them because we keep a reference in _messageDataRef.
+         */
         private Collection<QpidByteBuffer> getContentAsByteBuffer()
         {
             Collection<QpidByteBuffer> data = _messageDataRef == null ? Collections.<QpidByteBuffer>emptyList() : _messageDataRef.getData();
@@ -1505,23 +1509,7 @@ public abstract class AbstractJDBCMessag
                 {
                     checkMessageStoreOpen();
                     data = AbstractJDBCMessageStore.this.getAllContent(_messageId);
-                    T metaData = _messageDataRef.getMetaData();
-                    if (metaData == null)
-                    {
-                        try
-                        {
-                            metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId);
-                            _messageDataRef = new MessageDataSoftRef<T>(metaData, data);
-                        }
-                        catch (SQLException e)
-                        {
-                            throw new StoreException("Failed to get content for message id " + _messageId, e);
-                        }
-                    }
-                    else
-                    {
-                        _messageDataRef.setData(data);
-                    }
+                    _messageDataRef.setData(data);
                 }
                 else
                 {
@@ -1532,44 +1520,14 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public synchronized Collection<QpidByteBuffer> getContent(final int offsetInMessage, final int size)
+        public synchronized Collection<QpidByteBuffer> getContent()
         {
-            int pos = 0;
-            int added = 0;
-
             Collection<QpidByteBuffer> bufs = getContentAsByteBuffer();
-            List<QpidByteBuffer> content = new ArrayList<>(bufs.size());
-            for(QpidByteBuffer buf : bufs)
+            Collection<QpidByteBuffer> content = new ArrayList<>(bufs.size());
+            for (QpidByteBuffer buf : bufs)
             {
-                if(pos < offsetInMessage)
-                {
-                    final int remaining = buf.remaining();
-                    if(pos+ remaining >=offsetInMessage)
-                    {
-                        buf = buf.view(offsetInMessage-pos,size);
-
-                        content.add(buf);
-                        added += buf.remaining();
-                    }
-                    pos+= remaining;
-
-                }
-                else
-                {
-                    buf = buf.slice();
-                    if(buf.remaining() > (size-added))
-                    {
-                        buf.limit(size-added);
-                    }
-                    content.add(buf);
-                    added += buf.remaining();
-                }
-                if(added >= size)
-                {
-                    break;
-                }
+                content.add(buf.duplicate());
             }
-
             return content;
         }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java Wed Oct 14 08:53:42 2015
@@ -84,7 +84,7 @@ public class StoredMemoryMessage<T exten
         return this;
     }
 
-    public int getContent(int offset, ByteBuffer dst)
+    public int getContent(ByteBuffer dst)
     {
         if(_content == null)
         {
@@ -92,12 +92,10 @@ public class StoredMemoryMessage<T exten
         }
         QpidByteBuffer src = _content.duplicate();
 
-        int oldPosition = src.position();
-
-        src.position(oldPosition + offset);
+        src.position(0);
 
         int length = dst.remaining() < src.remaining() ? dst.remaining() : src.remaining();
-        src.limit(oldPosition + length);
+        src.limit(length);
 
         src.get(dst);
 
@@ -105,23 +103,14 @@ public class StoredMemoryMessage<T exten
         return length;
     }
 
-
-    public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
+    @Override
+    public Collection<QpidByteBuffer> getContent()
     {
         if(_content == null)
         {
             return null;
         }
-        QpidByteBuffer buf = _content.duplicate();
-
-        if(offsetInMessage != 0)
-        {
-            buf.position(offsetInMessage);
-            buf = buf.slice();
-        }
-
-        buf.limit(Math.min(size,buf.remaining()));
-        return Collections.singleton(buf);
+        return Collections.singleton(_content.duplicate());
     }
 
     public T getMetaData()

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java Wed Oct 14 08:53:42 2015
@@ -31,9 +31,9 @@ public interface StoredMessage<M extends
 
     long getMessageNumber();
 
-    int getContent(int offsetInMessage, ByteBuffer dst);
+    int getContent(ByteBuffer dst);
 
-    Collection<QpidByteBuffer> getContent(int offsetInMessage, int size);
+    Collection<QpidByteBuffer> getContent();
 
     void remove();
 

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java Wed Oct 14 08:53:42 2015
@@ -100,13 +100,13 @@ public class TestMessageMetaDataType imp
         }
 
         @Override
-        public int getContent(ByteBuffer buf, int offset)
+        public int getContent(ByteBuffer buf)
         {
             return 0;
         }
 
         @Override
-        public Collection<QpidByteBuffer> getContent(int offset, int size)
+        public Collection<QpidByteBuffer> getContent()
         {
             return null;
         }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Wed Oct 14 08:53:42 2015
@@ -49,11 +49,13 @@ class MockServerMessage implements Serve
         this.persistent = persistent;
     }
 
+    @Override
     public boolean isPersistent()
     {
         return persistent;
     }
 
+    @Override
     public MessageReference newReference()
     {
         throw new UnsupportedOperationException();
@@ -77,38 +79,44 @@ class MockServerMessage implements Serve
         return false;
     }
 
+    @Override
     public long getSize()
     {
         throw new UnsupportedOperationException();
     }
 
+    @Override
     public String getInitialRoutingAddress()
     {
         throw new UnsupportedOperationException();
     }
 
+    @Override
     public AMQMessageHeader getMessageHeader()
     {
         throw new UnsupportedOperationException();
     }
 
+    @Override
     public StoredMessage getStoredMessage()
     {
         throw new UnsupportedOperationException();
     }
 
+    @Override
     public long getExpiration()
     {
         throw new UnsupportedOperationException();
     }
 
-    public int getContent(ByteBuffer buf, int offset)
+    @Override
+    public int getContent(ByteBuffer buf)
     {
         throw new UnsupportedOperationException();
     }
 
-
-    public Collection<QpidByteBuffer> getContent(int offset, int size)
+    @Override
+    public Collection<QpidByteBuffer> getContent()
     {
         throw new UnsupportedOperationException();
     }
@@ -119,11 +127,13 @@ class MockServerMessage implements Serve
         return null;
     }
 
+    @Override
     public long getArrivalTime()
     {
         throw new UnsupportedOperationException();
     }
 
+    @Override
     public long getMessageNumber()
     {
         return 0L;

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java Wed Oct 14 08:53:42 2015
@@ -79,22 +79,21 @@ public class MessageConverter_Internal_t
                     }
 
                     @Override
-                    public int getContent(int offsetInMessage, ByteBuffer dst)
+                    public int getContent(ByteBuffer dst)
                     {
-                        int size = messageContent.length - offsetInMessage;
+                        int size = messageContent.length;
                         if(dst.remaining() < size)
                         {
                             size = dst.remaining();
                         }
-                        ByteBuffer buf = ByteBuffer.wrap(messageContent, offsetInMessage, size);
-                        dst.put(buf);
+                        dst.put(messageContent, 0, size);
                         return size;
                     }
 
                     @Override
-                    public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
+                    public Collection<QpidByteBuffer> getContent()
                     {
-                        return Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, size));
+                        return Collections.singleton(QpidByteBuffer.wrap(messageContent));
                     }
 
                     @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java Wed Oct 14 08:53:42 2015
@@ -26,7 +26,6 @@ import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -86,15 +85,15 @@ public class MessageConverter_v0_10 impl
                     }
 
                     @Override
-                    public int getContent(int offsetInMessage, ByteBuffer dst)
+                    public int getContent(ByteBuffer dst)
                     {
-                        return serverMsg.getContent(dst, offsetInMessage);
+                        return serverMsg.getContent(dst);
                     }
 
                     @Override
-                    public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
+                    public Collection<QpidByteBuffer> getContent()
                     {
-                        return serverMsg.getContent(offsetInMessage, size);
+                        return serverMsg.getContent();
                     }
 
                     @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java Wed Oct 14 08:53:42 2015
@@ -61,7 +61,7 @@ public class MessageConverter_v0_10_to_I
     {
         final String mimeType = serverMessage.getMessageHeader().getMimeType();
         byte[] data = new byte[(int) serverMessage.getSize()];
-        serverMessage.getContent(ByteBuffer.wrap(data), 0);
+        serverMessage.getContent(ByteBuffer.wrap(data));
 
         Object body = convertMessageBody(mimeType, data);
         MessageProperties messageProps = serverMessage.getHeader().getMessageProperties();

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java Wed Oct 14 08:53:42 2015
@@ -83,6 +83,6 @@ public class MessageTransferMessage exte
 
     public Collection<QpidByteBuffer> getBody()
     {
-        return  getContent(0, (int)getSize());
+        return  getContent();
     }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Wed Oct 14 08:53:42 2015
@@ -91,22 +91,21 @@ public class MessageConverter_Internal_t
             }
 
             @Override
-            public int getContent(int offsetInMessage, ByteBuffer dst)
+            public int getContent(ByteBuffer dst)
             {
-                int size = messageContent.length - offsetInMessage;
+                int size = messageContent.length;
                 if(dst.remaining() < size)
                 {
                     size = dst.remaining();
                 }
-                ByteBuffer buf = ByteBuffer.wrap(messageContent, offsetInMessage, size);
-                dst.put(buf);
+                dst.put(messageContent, 0, size);
                 return size;
             }
 
             @Override
-            public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
+            public Collection<QpidByteBuffer> getContent()
             {
-                return Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, size));
+                return Collections.singleton(QpidByteBuffer.wrap(messageContent));
             }
 
             @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java Wed Oct 14 08:53:42 2015
@@ -61,7 +61,7 @@ public class MessageConverter_v0_8_to_In
     {
         final String mimeType = serverMessage.getMessageHeader().getMimeType();
         byte[] data = new byte[(int) serverMessage.getSize()];
-        serverMessage.getContent(ByteBuffer.wrap(data), 0);
+        serverMessage.getContent(ByteBuffer.wrap(data));
 
         Object body = convertMessageBody(mimeType, data);
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java Wed Oct 14 08:53:42 2015
@@ -22,10 +22,10 @@ package org.apache.qpid.server.protocol.
 
 import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,14 +111,14 @@ public class ProtocolOutputConverterImpl
         // straight through case
         boolean compressionSupported = _connection.isCompressionSupported();
 
-        Collection<QpidByteBuffer> buffers = null;
+        Collection<QpidByteBuffer> buffers = message.getContent();
 
         long length;
         if(msgCompressed
            && !compressionSupported
-           && ((buffers = message.getContent(0, bodySize)) != null)
+           && (buffers != null)
            && (modifiedContent = GZIPUtils.uncompressBufferToArray(
-                        ByteBufferUtils.combine(buffers))) != null)
+                ByteBufferUtils.combine(buffers))) != null)
         {
             BasicContentHeaderProperties modifiedProps =
                     new BasicContentHeaderProperties(contentHeaderBody.getProperties());
@@ -132,7 +132,7 @@ public class ProtocolOutputConverterImpl
                 && compressionSupported
                 && contentHeaderBody.getProperties().getEncoding()==null
                 && bodySize > _connection.getMessageCompressionThreshold()
-                && ((buffers = message.getContent(0, bodySize)) != null)
+                && (buffers != null)
                 && (modifiedContent = GZIPUtils.compressBufferToArray(ByteBufferUtils.combine(buffers))) != null)
         {
             BasicContentHeaderProperties modifiedProps =
@@ -145,7 +145,7 @@ public class ProtocolOutputConverterImpl
         }
         else
         {
-            writeMessageDeliveryUnchanged(message, contentHeaderBody, channelId, deliverBody, bodySize);
+            writeMessageDeliveryUnchanged(buffers, contentHeaderBody, channelId, deliverBody, bodySize);
 
             length = bodySize;
         }
@@ -170,35 +170,17 @@ public class ProtocolOutputConverterImpl
         bodySize = content.length;
         ContentHeaderBody modifiedHeaderBody =
                 new ContentHeaderBody(modifiedProps, bodySize);
-        final MessageContentSource wrappedSource = new MessageContentSource()
-        {
-            @Override
-            public int getContent(final ByteBuffer buf, final int offset)
-            {
-                int size = Math.min(buf.remaining(), content.length - offset);
-                buf.put(content, offset, size);
-                return size;
-            }
-
-            @Override
-            public Collection<QpidByteBuffer> getContent(final int offset, final int size)
-            {
-                return Collections.singleton(QpidByteBuffer.wrap(content, offset, size));
-            }
-
-            @Override
-            public long getSize()
-            {
-                return content.length;
-            }
-        };
-        writeMessageDeliveryUnchanged(wrappedSource, modifiedHeaderBody, channelId, deliverBody, bodySize);
+        writeMessageDeliveryUnchanged(Collections.singleton(QpidByteBuffer.wrap(content)),
+                                      modifiedHeaderBody, channelId, deliverBody, bodySize);
         return bodySize;
     }
 
-    private void writeMessageDeliveryUnchanged(final MessageContentSource message,
-                                               final ContentHeaderBody contentHeaderBody,
-                                               final int channelId, final AMQBody deliverBody, final int bodySize)
+
+    private void writeMessageDeliveryUnchanged(Collection<QpidByteBuffer> messageBuffers,
+                                               ContentHeaderBody contentHeaderBody,
+                                               int channelId,
+                                               AMQBody deliverBody,
+                                               int bodySize)
     {
         if (bodySize == 0)
         {
@@ -216,7 +198,7 @@ public class ProtocolOutputConverterImpl
 
             int writtenSize = capacity;
 
-            AMQBody firstContentBody = new MessageContentSourceBody(message, 0, capacity);
+            AMQBody firstContentBody = new MessageContentSourceBody(messageBuffers, 0, capacity);
 
             CompositeAMQBodyBlock
                     compositeBlock =
@@ -226,7 +208,7 @@ public class ProtocolOutputConverterImpl
             while (writtenSize < bodySize)
             {
                 capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
-                MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+                AMQBody body = new MessageContentSourceBody(messageBuffers, writtenSize, capacity);
                 writtenSize += capacity;
 
                 writeFrame(new AMQFrame(channelId, body));
@@ -243,12 +225,47 @@ public class ProtocolOutputConverterImpl
     {
         public static final byte TYPE = 3;
         private final int _length;
-        private final MessageContentSource _message;
+        private final Collection<QpidByteBuffer> _contentBuffers;
         private final int _offset;
 
-        public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+        public MessageContentSourceBody(Collection<QpidByteBuffer> bufs, int offset, int length)
         {
-            _message = message;
+            int pos = 0;
+            int added = 0;
+
+            List<QpidByteBuffer> content = new ArrayList<>(bufs.size());
+            for(QpidByteBuffer buf : bufs)
+            {
+                if(pos < offset)
+                {
+                    final int remaining = buf.remaining();
+                    if(pos + remaining > offset)
+                    {
+                        buf = buf.view(offset-pos,length);
+
+                        content.add(buf);
+                        added += buf.remaining();
+                    }
+                    pos += remaining;
+
+                }
+                else
+                {
+                    buf = buf.slice();
+                    if(buf.remaining() > (length-added))
+                    {
+                        buf.limit(length-added);
+                    }
+                    content.add(buf);
+                    added += buf.remaining();
+                }
+                if(added >= length)
+                {
+                    break;
+                }
+            }
+
+            _contentBuffers = content;
             _offset = offset;
             _length = length;
         }
@@ -265,9 +282,7 @@ public class ProtocolOutputConverterImpl
 
         public void writePayload(DataOutput buffer) throws IOException
         {
-            Collection<QpidByteBuffer> bufs = _message.getContent(_offset, _length);
-
-            for(QpidByteBuffer buf : bufs)
+            for(QpidByteBuffer buf : _contentBuffers)
             {
                 if (buf.hasArray())
                 {
@@ -282,16 +297,15 @@ public class ProtocolOutputConverterImpl
 
                     buffer.write(data);
                 }
+                buf.dispose();
             }
         }
 
         @Override
         public long writePayload(final ByteBufferSender sender) throws IOException
         {
-
-            Collection<QpidByteBuffer> bufs = _message.getContent(_offset, _length);
             long size = 0l;
-            for(QpidByteBuffer buf : bufs)
+            for(QpidByteBuffer buf : _contentBuffers)
             {
                 size += buf.remaining();
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java Wed Oct 14 08:53:42 2015
@@ -62,7 +62,7 @@ public class MessageConverter_from_1_0
     public static Object convertBodyToObject(final Message_1_0 serverMessage)
     {
         byte[] data = new byte[(int) serverMessage.getSize()];
-        serverMessage.getStoredMessage().getContent(0, ByteBuffer.wrap(data));
+        serverMessage.getStoredMessage().getContent(ByteBuffer.wrap(data));
 
         SectionDecoderImpl sectionDecoder = new SectionDecoderImpl(MessageConverter_v1_0_to_Internal.TYPE_REGISTRY);
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Wed Oct 14 08:53:42 2015
@@ -208,7 +208,7 @@ public abstract class MessageConverter_t
     {
         final String mimeType = serverMessage.getMessageHeader().getMimeType();
         byte[] data = new byte[(int) serverMessage.getSize()];
-        serverMessage.getContent(ByteBuffer.wrap(data), 0);
+        serverMessage.getContent(ByteBuffer.wrap(data));
         byte[] uncompressed;
 
         if(Symbol.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING).equals(metaData.getPropertiesSection().getContentEncoding())
@@ -238,13 +238,12 @@ public abstract class MessageConverter_t
                         }
 
                         @Override
-                        public int getContent(int offsetInMessage, ByteBuffer dst)
+                        public int getContent(ByteBuffer dst)
                         {
                             QpidByteBuffer buf = allData.duplicate();
-                            buf.position(offsetInMessage);
-                            buf = buf.slice();
+                            buf.position(0);
                             int size;
-                            if(dst.remaining()<buf.remaining())
+                            if (dst.remaining() < buf.remaining())
                             {
                                 buf.limit(dst.remaining());
                                 size = dst.remaining();
@@ -253,16 +252,15 @@ public abstract class MessageConverter_t
                             {
                                 size = buf.remaining();
                             }
-                            buf.copyTo(dst);
+                            buf.get(dst);
                             buf.dispose();
                             return size;
                         }
 
                         @Override
-                        public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
+                        public Collection<QpidByteBuffer> getContent()
                         {
-                            QpidByteBuffer buf = allData.view(offsetInMessage, Math.min(size,allData.remaining()-offsetInMessage));
-                            return Collections.singleton(buf);
+                            return Collections.singleton(allData.duplicate());
                         }
 
                         @Override

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Wed Oct 14 08:53:42 2015
@@ -71,8 +71,7 @@ public class Message_1_0 extends Abstrac
 
     private static Collection<QpidByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage)
     {
-        return storedMessage.getContent(0, Integer.MAX_VALUE);
-
+        return storedMessage.getContent();
     }
 
     public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,

Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java Wed Oct 14 08:53:42 2015
@@ -90,22 +90,21 @@ public class MessageConverter_1_0_to_v0_
             }
 
             @Override
-            public int getContent(int offsetInMessage, ByteBuffer dst)
+            public int getContent(ByteBuffer dst)
             {
-                int size = messageContent.length - offsetInMessage;
+                int size = messageContent.length;
                 if(dst.remaining() < size)
                 {
                     size = dst.remaining();
                 }
-                ByteBuffer buf = ByteBuffer.wrap(messageContent, offsetInMessage, size);
-                dst.put(buf);
+                dst.put(messageContent, 0, size);
                 return size;
             }
 
             @Override
-            public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
+            public Collection<QpidByteBuffer> getContent()
             {
-                return Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, size));
+                return Collections.singleton(QpidByteBuffer.wrap(messageContent));
             }
 
             @Override

Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Wed Oct 14 08:53:42 2015
@@ -194,15 +194,15 @@ public class MessageConverter_0_10_to_0_
             }
 
             @Override
-            public int getContent(int offsetInMessage, ByteBuffer dst)
+            public int getContent(ByteBuffer dst)
             {
-                return message.getContent(dst, offsetInMessage);
+                return message.getContent(dst);
             }
 
             @Override
-            public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
+            public Collection<QpidByteBuffer> getContent()
             {
-                return message.getContent(offsetInMessage, size);
+                return message.getContent();
             }
 
             @Override

Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Wed Oct 14 08:53:42 2015
@@ -83,15 +83,15 @@ public class MessageConverter_0_8_to_0_1
             }
 
             @Override
-            public int getContent(int offsetInMessage, ByteBuffer dst)
+            public int getContent(ByteBuffer dst)
             {
-                return message_0_8.getContent(dst, offsetInMessage);
+                return message_0_8.getContent(dst);
             }
 
             @Override
-            public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
+            public Collection<QpidByteBuffer> getContent()
             {
-                return message_0_8.getContent(offsetInMessage, size);
+                return message_0_8.getContent();
             }
 
             @Override

Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java Wed Oct 14 08:53:42 2015
@@ -92,22 +92,21 @@ public class MessageConverter_1_0_to_v0_
             }
 
             @Override
-            public int getContent(int offsetInMessage, ByteBuffer dst)
+            public int getContent(ByteBuffer dst)
             {
-                int size = messageContent.length - offsetInMessage;
+                int size = messageContent.length;
                 if(dst.remaining() < size)
                 {
                     size = dst.remaining();
                 }
-                ByteBuffer buf = ByteBuffer.wrap(messageContent, offsetInMessage, size);
-                dst.put(buf);
+                dst.put(messageContent, 0, size);
                 return size;
             }
 
             @Override
-            public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
+            public Collection<QpidByteBuffer> getContent()
             {
-                return Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, size));
+                return Collections.singleton(QpidByteBuffer.wrap(messageContent));
             }
 
             @Override

Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java Wed Oct 14 08:53:42 2015
@@ -218,7 +218,7 @@ public class ReportRunner<T>
     private static ReportableMessage convertMessage(QueueEntry entry)
     {
         final MessageInfoImpl messageInfo = new MessageInfoImpl(entry, true);
-        final Collection<QpidByteBuffer> contentBuffers = entry.getMessage().getContent(0, (int) entry.getSize());
+        final Collection<QpidByteBuffer> contentBuffers = entry.getMessage().getContent();
         final ByteBuffer content = ByteBufferUtils.combine(contentBuffers);
         for(QpidByteBuffer buf : contentBuffers)
         {

Modified: qpid/java/trunk/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java (original)
+++ qpid/java/trunk/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java Wed Oct 14 08:53:42 2015
@@ -403,7 +403,7 @@ public class QueueMBean extends AMQManag
         byte[] msgContent = new byte[bodySize];
 
         ByteBuffer buf = ByteBuffer.wrap(msgContent);
-        int stored = serverMsg.getContent(buf, 0);
+        int stored = serverMsg.getContent(buf);
 
         if(bodySize != stored)
         {

Modified: qpid/java/trunk/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java?rev=1708561&r1=1708560&r2=1708561&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java (original)
+++ qpid/java/trunk/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java Wed Oct 14 08:53:42 2015
@@ -445,18 +445,15 @@ public class QueueMBeanTest extends Qpid
                 Object[] args = invocation.getArguments();
 
                 //verify the arg types / expected values
-                assertEquals(2, args.length);
+                assertEquals(1, args.length);
                 assertTrue(args[0] instanceof ByteBuffer);
-                assertTrue(args[1] instanceof Integer);
 
                 ByteBuffer dest = (ByteBuffer) args[0];
-                int offset = (Integer) args[1];
-                assertEquals(0, offset);
 
                 dest.put(content);
                 return messageContentSize;
             }
-        }).when(serverMessage).getContent(Matchers.any(ByteBuffer.class), Matchers.anyInt());
+        }).when(serverMessage).getContent(Matchers.any(ByteBuffer.class));
 
         final QueueEntry entry = mock(QueueEntry.class);
         when(entry.getMessage()).thenReturn(serverMessage);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org