You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/11/10 18:20:01 UTC

svn commit: r1713702 - in /qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid: amqp_1_0/codec/ amqp_1_0/framing/ amqp_1_0/transport/ amqp_1_0/type/transport/ server/protocol/v1_0/

Author: orudyy
Date: Tue Nov 10 17:20:00 2015
New Revision: 1713702

URL: http://svn.apache.org/viewvc?rev=1713702&view=rev
Log:
QPID-6832: Resolve memory leaks for AMQP 1.0 support in Broker

Modified:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Transfer.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java Tue Nov 10 17:20:00 2015
@@ -41,20 +41,29 @@ public abstract class ArrayTypeConstruct
                                          size, in.remaining());
         }
         QpidByteBuffer dup = in.slice();
-        dup.limit(size);
-        in.position(in.position()+size);
-        int count = read(dup);
-        TypeConstructor t = handler.readConstructor(dup);
-        List rval = new ArrayList(count);
-        for(int i = 0; i < count; i++)
+
+        List rval;
+        try
         {
-            rval.add(t.construct(dup, handler));
+            dup.limit(size);
+            in.position(in.position()+size);
+            int count = read(dup);
+            TypeConstructor t = handler.readConstructor(dup);
+            rval = new ArrayList(count);
+            for(int i = 0; i < count; i++)
+            {
+                rval.add(t.construct(dup, handler));
+            }
+            if(dup.hasRemaining())
+            {
+                throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                             "Array incorrectly encoded, %d bytes remaining after decoding %d elements",
+                                             dup.remaining(), count);
+            }
         }
-        if(dup.hasRemaining())
+        finally
         {
-            throw new AmqpErrorException(AmqpError.DECODE_ERROR,
-                                         "Array incorrectly encoded, %d bytes remaining after decoding %d elements",
-                                         dup.remaining(), count);
+            dup.dispose();
         }
         if(rval.size() == 0)
         {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java Tue Nov 10 17:20:00 2015
@@ -56,26 +56,9 @@ public class BinaryTypeConstructor exten
             size = in.getInt();
         }
 
-        QpidByteBuffer inDup = in.slice();
-        inDup.limit(inDup.position()+size);
-
-        Binary binary;
-/*        if(isCopy && inDup.hasArray())
-        {
-            binary= new Binary(inDup.array(), inDup.arrayOffset()+inDup.position(),size);
-        }
-        else
-        {*/
-            byte[] buf = new byte[size];
-            inDup.get(buf);
-            binary = new Binary(buf);
-  /*      }*/
-
-        in.position(in.position()+size);
-
-
-        return binary;
-
+        byte[] buf = new byte[size];
+        in.get(buf);
+        return new Binary(buf);
     }
 
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java Tue Nov 10 17:20:00 2015
@@ -171,10 +171,6 @@ public class CompoundTypeConstructor ext
             count = in.getInt();
         }
 
-        QpidByteBuffer inDup = in.slice();
-
-        inDup.limit(size-getSize());
-
         CompoundTypeAssembler assembler = _assemblerFactory.newInstance();
 
         assembler.init(count);

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java Tue Nov 10 17:20:00 2015
@@ -215,8 +215,6 @@ public abstract class CompoundWriter<V>
     {
 
         State state = State.FORMAT_CODE;
-        /*ByteBuffer origBuffer = buffer;
-        buffer = buffer.duplicate();*/
         int origPosition = buffer.position();
         int length ;
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java Tue Nov 10 17:20:00 2015
@@ -84,10 +84,12 @@ public class FrameWriter implements Valu
                     {
                         _typeWriter.setValue(_frame.getFrameBody());
 
+                        QpidByteBuffer qpidByteBuffer = remaining > 8
+                                ? buffer.duplicate().position(buffer.position() + 8)
+                                : QpidByteBuffer.wrap(EMPTY_BYTE_ARRAY);
 
-                        _size = _typeWriter.writeToBuffer(remaining > 8
-                                                          ? buffer.duplicate().position(buffer.position()+8)
-                                                          : QpidByteBuffer.wrap(EMPTY_BYTE_ARRAY)) + 8 + payloadLength;
+                        _size = _typeWriter.writeToBuffer(qpidByteBuffer) + 8 + payloadLength;
+                        qpidByteBuffer.dispose();
                     }
                     else
                     {
@@ -118,6 +120,7 @@ public class FrameWriter implements Valu
                                     int payloadUsed = buffer.remaining();
                                     dup.limit(payloadUsed);
                                     buffer.put(dup);
+                                    dup.dispose();
                                     _payload.position(_payload.position()+payloadUsed);
                                 }
                                 _state = State.PAYLOAD;
@@ -129,7 +132,17 @@ public class FrameWriter implements Valu
                                 if(payloadLength > 0)
                                 {
                                     buffer.put(_payload);
+
+                                }
+
+                                if (_payload != null)
+                                {
+                                    _payload.dispose();
+                                    _payload = null;
                                 }
+
+                                _frame = null;
+                                _typeWriter = null;
                                 _state = State.DONE;
                             }
 
@@ -217,6 +230,10 @@ public class FrameWriter implements Valu
                         _state = State.DONE;
                         _frame = null;
                         _typeWriter = null;
+                        if (_payload != null)
+                        {
+                            _payload.dispose();
+                        }
                         _payload = null;
 
                     }
@@ -228,12 +245,14 @@ public class FrameWriter implements Valu
                             _state = State.DONE;
                             _frame = null;
                             _typeWriter = null;
+                            _payload.dispose();
                             _payload = null;
                         }
                     }
 
             }
         }
+
         if(_size == -1)
         {
             _size =  _typeWriter.writeToBuffer(QpidByteBuffer.wrap(EMPTY_BYTE_ARRAY)) + 8 + (_payload == null ? 0 : _payload.remaining());

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java Tue Nov 10 17:20:00 2015
@@ -64,19 +64,22 @@ public class StringTypeConstructor exten
         try
         {
             dup.limit(dup.position()+size);
+            CharBuffer charBuf = dup.decode(_charSet);
+
+            String str = charBuf.toString();
+
+            in.position(origPosition + size);
+
+            return str;
         }
         catch(IllegalArgumentException e)
         {
             throw new IllegalArgumentException("position: " + dup.position() + "size: " + size + " capacity: " + dup.capacity());
         }
-        CharBuffer charBuf = dup.decode(_charSet);
-
-        String str = charBuf.toString();
-
-        in.position(origPosition+size);
-
-        return str;
-
+        finally
+        {
+            dup.dispose();
+        }
     }
 
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java Tue Nov 10 17:20:00 2015
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.amqp_1_0.codec;
 
-import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.nio.charset.Charset;
 import java.util.concurrent.ConcurrentHashMap;
@@ -70,7 +69,9 @@ public class SymbolTypeConstructor exten
         else
         {
             byte[] b = new byte[in.remaining()];
-            in.duplicate().get(b);
+            QpidByteBuffer dup = in.duplicate();
+            dup.get(b);
+            dup.dispose();
             binaryStr = new BinaryString(b, 0, b.length);
         }
 
@@ -80,13 +81,10 @@ public class SymbolTypeConstructor exten
             QpidByteBuffer dup = in.duplicate();
             dup.limit(in.position()+size);
             CharBuffer charBuf = dup.decode(ASCII);
-
+            dup.dispose();
 
             symbolVal = Symbol.getSymbol(charBuf.toString());
 
-
-
-
             byte[] data = new byte[size];
             in.get(data);
             binaryStr = new BinaryString(data, 0, size);

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java Tue Nov 10 17:20:00 2015
@@ -144,7 +144,8 @@ public class FrameHandler implements Pro
                 case BUFFERING:
                     if(_buffer != null)
                     {
-                        if(in.remaining() < _buffer.remaining())
+                        final int bufferRemaining = _buffer.remaining();
+                        if(in.remaining() < bufferRemaining)
                         {
                             _buffer.put(in);
                             break;
@@ -152,12 +153,14 @@ public class FrameHandler implements Pro
                         else
                         {
                             QpidByteBuffer dup = in.duplicate();
-                            dup.limit(dup.position()+_buffer.remaining());
-                            int i = _buffer.remaining();
-                            int d = dup.remaining();
-                            in.position(in.position()+_buffer.remaining());
+                            dup.limit(dup.position() + bufferRemaining);
+
                             _buffer.put(dup);
+                            dup.dispose();
+
+                            in.position(in.position() + bufferRemaining);
                             oldIn = in;
+
                             _buffer.flip();
                             in = _buffer;
                             state = State.PARSING;
@@ -240,14 +243,10 @@ public class FrameHandler implements Pro
                         }
 
                         _connection.receive((short)channel,val);
+
                         reset();
-                        in = oldIn;
-                        oldIn = null;
-                        _buffer = null;
                         state = State.SIZE_0;
                         break;
-
-
                     }
                     catch (AmqpErrorException ex)
                     {
@@ -260,6 +259,14 @@ public class FrameHandler implements Pro
                         in.limit(inLimit);
                         throw e;
                     }
+                    finally
+                    {
+                        in.dispose();
+
+                        in = oldIn;
+                        oldIn = null;
+                        _buffer = null;
+                    }
             }
 
         }
@@ -270,6 +277,12 @@ public class FrameHandler implements Pro
         if(_state == State.ERROR)
         {
             _connection.handleError(frameParsingError);
+
+            if (_buffer != null)
+            {
+                _buffer.dispose();
+                _buffer = null;
+            }
         }
         }
         catch(RuntimeException e)

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java Tue Nov 10 17:20:00 2015
@@ -150,10 +150,10 @@ public class SASLFrameHandler implements
                         {
                             QpidByteBuffer dup = in.duplicate();
                             dup.limit(dup.position()+_buffer.remaining());
-                            int i = _buffer.remaining();
-                            int d = dup.remaining();
                             in.position(in.position()+_buffer.remaining());
                             _buffer.put(dup);
+                            dup.dispose();
+
                             oldIn = in;
                             _buffer.flip();
                             in = _buffer;
@@ -239,9 +239,6 @@ public class SASLFrameHandler implements
                         {
                             _connection.receive((short)channel,val);
                             reset();
-                            in = oldIn;
-                            oldIn = null;
-                            _buffer = null;
                             state = State.SIZE_0;
                             break;
                         }
@@ -253,6 +250,14 @@ public class SASLFrameHandler implements
                         state = State.ERROR;
                         frameParsingError = ex.getError();
                     }
+                    finally
+                    {
+                        in.dispose();
+
+                        in = oldIn;
+                        oldIn = null;
+                        _buffer = null;
+                    }
             }
 
         }
@@ -263,6 +268,11 @@ public class SASLFrameHandler implements
         if(_state == State.ERROR)
         {
             _connection.handleError(frameParsingError);
+            if (_buffer != null)
+            {
+                _buffer.dispose();
+                _buffer = null;
+            }
         }
             if(_connection.isSASLComplete())
             {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java Tue Nov 10 17:20:00 2015
@@ -695,32 +695,35 @@ public class ConnectionEndpoint implemen
             int size = writer.writeToBuffer(EMPTY_BYTE_BUFFER);
             QpidByteBuffer payloadDup = payload == null ? null : payload.duplicate();
             int payloadSent = getMaxFrameSize() - (size + 9);
-            if (payloadSent < (payload == null ? 0 : payload.remaining()))
+            try
             {
-
-                if (body instanceof Transfer)
+                if (payloadSent < (payload == null ? 0 : payload.remaining()))
                 {
-                    ((Transfer) body).setMore(Boolean.TRUE);
-                }
 
-                writer = _describedTypeRegistry.getValueWriter(body);
-                size = writer.writeToBuffer(EMPTY_BYTE_BUFFER);
-                payloadSent = getMaxFrameSize() - (size + 9);
+                    if (body instanceof Transfer)
+                    {
+                        ((Transfer) body).setMore(Boolean.TRUE);
+                    }
+
+                    writer = _describedTypeRegistry.getValueWriter(body);
+                    size = writer.writeToBuffer(EMPTY_BYTE_BUFFER);
+                    payloadSent = getMaxFrameSize() - (size + 9);
 
-                try
-                {
                     payloadDup.limit(payloadDup.position() + payloadSent);
                 }
-                catch (NullPointerException npe)
+                else
                 {
-                    throw npe;
+                    payloadSent = payload == null ? 0 : payload.remaining();
                 }
+                _frameOutputHandler.send(AMQFrame.createAMQFrame(channel, body, payloadDup));
             }
-            else
+            finally
             {
-                payloadSent = payload == null ? 0 : payload.remaining();
+                if (payloadDup != null)
+                {
+                    payloadDup.dispose();
+                }
             }
-            _frameOutputHandler.send(AMQFrame.createAMQFrame(channel, body, payloadDup));
             return payloadSent;
         }
         else

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java Tue Nov 10 17:20:00 2015
@@ -644,18 +644,25 @@ public class SessionEndpoint
             if(payload != null && payloadSent < payload.remaining() && payloadSent >= 0)
             {
                 payload = payload.duplicate();
-                payload.position(payload.position()+payloadSent);
+                try
+                {
+                    payload.position(payload.position()+payloadSent);
 
-                Transfer secondTransfer = new Transfer();
+                    Transfer secondTransfer = new Transfer();
 
-                secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
-                secondTransfer.setHandle(xfr.getHandle());
-                secondTransfer.setSettled(xfr.getSettled());
-                secondTransfer.setState(xfr.getState());
-                secondTransfer.setMessageFormat(xfr.getMessageFormat());
-                secondTransfer.setPayload(payload);
+                    secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
+                    secondTransfer.setHandle(xfr.getHandle());
+                    secondTransfer.setSettled(xfr.getSettled());
+                    secondTransfer.setState(xfr.getState());
+                    secondTransfer.setMessageFormat(xfr.getMessageFormat());
+                    secondTransfer.setPayload(payload);
 
-                sendTransfer(secondTransfer, endpoint, false);
+                    sendTransfer(secondTransfer, endpoint, false);
+                }
+                finally
+                {
+                    payload.dispose();
+                }
 
             }
         }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Transfer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Transfer.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Transfer.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Transfer.java Tue Nov 10 17:20:00 2015
@@ -38,7 +38,7 @@ public class Transfer
   {
 
 
-    private QpidByteBuffer _payload;
+    private volatile QpidByteBuffer _payload;
 
     private UnsignedInteger _handle;
 
@@ -296,5 +296,12 @@ public class Transfer
         return _payload;
     }
 
-
+    public void dispose()
+    {
+        if (_payload != null)
+        {
+            _payload.dispose();
+            _payload = null;
+        }
+    }
   }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Tue Nov 10 17:20:00 2015
@@ -315,6 +315,7 @@ public class AMQPConnection_1_0 extends
                 QpidByteBuffer dup = msg.duplicate();
                 byte[] data = new byte[dup.remaining()];
                 dup.get(data);
+                dup.dispose();
                 Binary bin = new Binary(data);
                 RAW_LOGGER.debug("RECV[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
             }
@@ -497,29 +498,35 @@ public class AMQPConnection_1_0 extends
 
             _frameWriter.setValue(amqFrame);
 
-            QpidByteBuffer dup = QpidByteBuffer.allocateDirect(_endpoint.getMaxFrameSize());
+            QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(_endpoint.getMaxFrameSize());
 
-            int size = _frameWriter.writeToBuffer(dup);
-            if (size > _endpoint.getMaxFrameSize())
+            try
             {
-                throw new OversizeFrameException(amqFrame, size);
-            }
+                int size = _frameWriter.writeToBuffer(buffer);
+                if (size > _endpoint.getMaxFrameSize())
+                {
+                    throw new OversizeFrameException(amqFrame, size);
+                }
 
-            dup.flip();
+                buffer.flip();
+
+                if (RAW_LOGGER.isDebugEnabled())
+                {
+                    QpidByteBuffer dup = buffer.duplicate();
+                    byte[] data = new byte[dup.remaining()];
+                    dup.get(data);
+                    dup.dispose();
+                    Binary bin = new Binary(data);
+                    RAW_LOGGER.debug("SEND[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
+                }
 
-            if (RAW_LOGGER.isDebugEnabled())
+                getSender().send(buffer);
+                getSender().flush();
+            }
+            finally
             {
-                QpidByteBuffer dup2 = dup.duplicate();
-                byte[] data = new byte[dup2.remaining()];
-                dup2.get(data);
-                Binary bin = new Binary(data);
-                RAW_LOGGER.debug("SEND[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
+                buffer.dispose();
             }
-
-            getSender().send(dup);
-            getSender().flush();
-
-
         }
     }
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Tue Nov 10 17:20:00 2015
@@ -137,141 +137,146 @@ class ConsumerTarget_1_0 extends Abstrac
         }
 
         Transfer transfer = new Transfer();
-        //TODO
-
-        Collection<QpidByteBuffer> fragments = message.getFragments();
-        QpidByteBuffer payload;
-        if(fragments.size() == 1)
-        {
-            payload = fragments.iterator().next();
-        }
-        else
+        try
         {
-            int size = 0;
-            for(QpidByteBuffer fragment : fragments)
+            QpidByteBuffer payload = null;
+            //TODO
+            Collection<QpidByteBuffer> fragments = message.getFragments();
+            if(fragments.size() == 1)
             {
-                size += fragment.remaining();
+                payload = fragments.iterator().next();
             }
-
-            payload = QpidByteBuffer.allocateDirect(size);
-
-            for(QpidByteBuffer fragment : fragments)
+            else
             {
-                payload.put(fragment.duplicate());
-            }
+                int size = 0;
+                for(QpidByteBuffer fragment : fragments)
+                {
+                    size += fragment.remaining();
+                }
 
-            payload.flip();
-        }
+                payload = QpidByteBuffer.allocateDirect(size);
 
-        if(entry.getDeliveryCount() != 0)
-        {
-            payload = payload.duplicate();
-            ValueHandler valueHandler = new ValueHandler(_typeRegistry);
+                for(QpidByteBuffer fragment : fragments)
+                {
+                    payload.put(fragment);
+                    fragment.dispose();
+                }
+
+                payload.flip();
+            }
 
-            Header oldHeader = null;
-            try
+            if(entry.getDeliveryCount() != 0)
             {
-                Object value = valueHandler.parse(payload);
-                if(value instanceof Header)
+                ValueHandler valueHandler = new ValueHandler(_typeRegistry);
+
+                Header oldHeader = null;
+                try
                 {
-                    oldHeader = (Header) value;
+                    Object value = valueHandler.parse(payload);
+                    if(value instanceof Header)
+                    {
+                        oldHeader = (Header) value;
+                    }
+                    else
+                    {
+                        payload.position(0);
+                    }
                 }
-                else
+                catch (AmqpErrorException e)
                 {
-                    payload.position(0);
+                    //TODO
+                    throw new ConnectionScopedRuntimeException(e);
                 }
-            }
-            catch (AmqpErrorException e)
-            {
-                //TODO
-                throw new ConnectionScopedRuntimeException(e);
-            }
 
-            Header header = new Header();
-            if(oldHeader != null)
-            {
-                header.setDurable(oldHeader.getDurable());
-                header.setPriority(oldHeader.getPriority());
-                header.setTtl(oldHeader.getTtl());
+                Header header = new Header();
+                if(oldHeader != null)
+                {
+                    header.setDurable(oldHeader.getDurable());
+                    header.setPriority(oldHeader.getPriority());
+                    header.setTtl(oldHeader.getTtl());
+                }
+                header.setDeliveryCount(UnsignedInteger.valueOf(entry.getDeliveryCount()));
+                _sectionEncoder.reset();
+                _sectionEncoder.encodeObject(header);
+                Binary encodedHeader = _sectionEncoder.getEncoding();
+
+                QpidByteBuffer oldPayload = payload;
+                payload = QpidByteBuffer.allocateDirect(oldPayload.remaining() + encodedHeader.getLength());
+                payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength());
+                payload.put(oldPayload);
+                oldPayload.dispose();
+                payload.flip();
             }
-            header.setDeliveryCount(UnsignedInteger.valueOf(entry.getDeliveryCount()));
-            _sectionEncoder.reset();
-            _sectionEncoder.encodeObject(header);
-            Binary encodedHeader = _sectionEncoder.getEncoding();
-
-            QpidByteBuffer oldPayload = payload;
-            payload = QpidByteBuffer.allocateDirect(oldPayload.remaining() + encodedHeader.getLength());
-            payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength());
-            payload.put(oldPayload);
-            payload.flip();
-        }
 
-        transfer.setPayload(payload);
-        byte[] data = new byte[8];
-        ByteBuffer.wrap(data).putLong(_deliveryTag++);
-        final Binary tag = new Binary(data);
+            transfer.setPayload(payload);
+            byte[] data = new byte[8];
+            ByteBuffer.wrap(data).putLong(_deliveryTag++);
+            final Binary tag = new Binary(data);
 
-        transfer.setDeliveryTag(tag);
+            transfer.setDeliveryTag(tag);
 
-        synchronized(_link.getLock())
-        {
-            if(_link.isAttached())
+            synchronized(_link.getLock())
             {
-                if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
+                if(_link.isAttached())
                 {
-                    transfer.setSettled(true);
-                }
-                else
-                {
-                    UnsettledAction action = _acquires
-                                             ? new DispositionAction(tag, entry)
-                                             : new DoNothingAction(tag, entry);
+                    if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
+                    {
+                        transfer.setSettled(true);
+                    }
+                    else
+                    {
+                        UnsettledAction action = _acquires
+                                                 ? new DispositionAction(tag, entry)
+                                                 : new DoNothingAction(tag, entry);
 
-                    _link.addUnsettled(tag, action, entry);
-                }
+                        _link.addUnsettled(tag, action, entry);
+                    }
 
-                if(_transactionId != null)
-                {
-                    TransactionalState state = new TransactionalState();
-                    state.setTxnId(_transactionId);
-                    transfer.setState(state);
-                }
-                // TODO - need to deal with failure here
-                if(_acquires && _transactionId != null)
-                {
-                    ServerTransaction txn = _link.getTransaction(_transactionId);
-                    if(txn != null)
+                    if(_transactionId != null)
+                    {
+                        TransactionalState state = new TransactionalState();
+                        state.setTxnId(_transactionId);
+                        transfer.setState(state);
+                    }
+                    // TODO - need to deal with failure here
+                    if(_acquires && _transactionId != null)
                     {
-                        txn.addPostTransactionAction(new ServerTransaction.Action(){
+                        ServerTransaction txn = _link.getTransaction(_transactionId);
+                        if(txn != null)
+                        {
+                            txn.addPostTransactionAction(new ServerTransaction.Action(){
 
-                            public void postCommit()
-                            {
-                                //To change body of implemented methods use File | Settings | File Templates.
-                            }
+                                public void postCommit()
+                                {
+                                }
 
-                            public void onRollback()
-                            {
-                                if(entry.isAcquiredBy(getConsumer()))
+                                public void onRollback()
                                 {
-                                    entry.release();
-                                    _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
+                                    if(entry.isAcquiredBy(getConsumer()))
+                                    {
+                                        entry.release();
+                                        _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
 
 
+                                    }
                                 }
-                            }
-                        });
-                    }
+                            });
+                        }
 
+                    }
+                    getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
+                    getEndpoint().transfer(transfer);
+                }
+                else
+                {
+                    entry.release();
                 }
-                getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
-                getEndpoint().transfer(transfer);
-            }
-            else
-            {
-                entry.release();
             }
         }
-
+        finally
+        {
+            transfer.dispose();
+        }
     }
 
     public void flushBatched()

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Tue Nov 10 17:20:00 2015
@@ -70,7 +70,7 @@ public class MessageMetaData_1_0 impleme
     private Map _appProperties;
     private Map _footer;
 
-    private List<QpidByteBuffer> _encodedSections = new ArrayList<>(3);
+    private volatile List<QpidByteBuffer> _encodedSections = new ArrayList<>(3);
 
     private volatile QpidByteBuffer _encoded;
     private MessageHeader_1_0 _messageHeader;
@@ -180,7 +180,9 @@ public class MessageMetaData_1_0 impleme
             src = QpidByteBuffer.allocateDirect(size);
             for(QpidByteBuffer buf : fragments)
             {
-                src.put(buf.duplicate());
+                QpidByteBuffer duplicate = buf.duplicate();
+                src.put(duplicate);
+                duplicate.dispose();
             }
             src.flip();
 
@@ -273,26 +275,9 @@ public class MessageMetaData_1_0 impleme
             }
 
 
-            int pos = 0;
             for(QpidByteBuffer buf : fragments)
             {
-/*
-                if(pos < startBarePos)
-                {
-                    if(pos + buf.remaining() > startBarePos)
-                    {
-                        ByteBuffer dup = buf.duplicate();
-                        dup.position(dup.position()+startBarePos-pos);
-                        dup.slice();
-                        encodedSections.add(dup);
-                    }
-                }
-                else
-*/
-                {
-                    encodedSections.add(buf.duplicate());
-                }
-                pos += buf.remaining();
+                encodedSections.add(buf.duplicate());
             }
 
             return sections;
@@ -302,6 +287,10 @@ public class MessageMetaData_1_0 impleme
             _logger.error("Decoding read section error", e);
             throw new IllegalArgumentException(e);
         }
+        finally
+        {
+            src.dispose();
+        }
     }
 
 
@@ -329,7 +318,9 @@ public class MessageMetaData_1_0 impleme
 
         for(QpidByteBuffer bin : _encodedSections)
         {
-            buf.put(bin.duplicate());
+            QpidByteBuffer duplicate = bin.duplicate();
+            buf.put(duplicate);
+            duplicate.dispose();
         }
 
         return buf;
@@ -402,6 +393,11 @@ public class MessageMetaData_1_0 impleme
     @Override
     public void dispose()
     {
+        for(QpidByteBuffer bin : _encodedSections)
+        {
+            bin.dispose();
+        }
+        _encodedSections = null;
         _encoded.dispose();
         _encoded = null;
     }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Tue Nov 10 17:20:00 2015
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 
-import java.lang.ref.SoftReference;
 import java.util.Collection;
 import java.util.Collections;
 
@@ -42,7 +41,6 @@ public class Message_1_0 extends Abstrac
             .registerSecurityLayer();
     public static final MessageMetaData_1_0 DELETED_MESSAGE_METADATA = new MessageMetaData_1_0(Collections.<Section>emptyList(), new SectionEncoderImpl(DESCRIBED_TYPE_REGISTRY));
 
-    private volatile SoftReference<Collection<QpidByteBuffer>> _fragmentsRef;
     private long _arrivalTime;
     private final long _size;
 
@@ -50,37 +48,14 @@ public class Message_1_0 extends Abstrac
     public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
     {
         super(storedMessage, null);
-        final Collection<QpidByteBuffer> fragments = restoreFragments(getStoredMessage());
-        _fragmentsRef = new SoftReference<>(fragments);
-        _size = calculateSize(fragments);
-    }
-
-    private long calculateSize(final Collection<QpidByteBuffer> fragments)
-    {
-
-        long size = 0l;
-        if(fragments != null)
-        {
-            for(QpidByteBuffer buf : fragments)
-            {
-                size += buf.remaining();
-            }
-        }
-        return size;
-    }
-
-    private static Collection<QpidByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage)
-    {
-        return storedMessage.getContent();
+        _size = storedMessage.getMetaData().getContentSize();
     }
 
     public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,
-                       final Collection<QpidByteBuffer> fragments,
                        final Object connectionReference)
     {
         super(storedMessage, connectionReference);
-        _fragmentsRef = new SoftReference<>(fragments);
-        _size = calculateSize(fragments);
+        _size = storedMessage.getMetaData().getContentSize();
         _arrivalTime = System.currentTimeMillis();
     }
 
@@ -125,14 +100,7 @@ public class Message_1_0 extends Abstrac
 
     public Collection<QpidByteBuffer> getFragments()
     {
-
-        Collection<QpidByteBuffer> fragments = _fragmentsRef.get();
-        if(fragments == null)
-        {
-            fragments = restoreFragments(getStoredMessage());
-            _fragmentsRef = new SoftReference<>(fragments);
-        }
-        return fragments;
+        return getContent();
     }
 
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Tue Nov 10 17:20:00 2015
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -90,14 +89,15 @@ public class ReceivingLink_1_0 implement
 
         List<QpidByteBuffer> fragments = null;
 
-
+        org.apache.qpid.amqp_1_0.type.DeliveryState xfrState = xfr.getState();
+        final Binary deliveryTag = xfr.getDeliveryTag();
 
         if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
         {
             _incompleteMessage = new ArrayList<Transfer>();
             _incompleteMessage.add(xfr);
             _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
-            _messageDeliveryTag = xfr.getDeliveryTag();
+            _messageDeliveryTag = deliveryTag;
             return;
         }
         else if(_incompleteMessage != null)
@@ -110,9 +110,11 @@ public class ReceivingLink_1_0 implement
             }
 
             fragments = new ArrayList<QpidByteBuffer>(_incompleteMessage.size());
+
             for(Transfer t : _incompleteMessage)
             {
-                fragments.add(t.getPayload());
+                fragments.add(t.getPayload().duplicate());
+                t.dispose();
             }
             _incompleteMessage=null;
 
@@ -120,8 +122,9 @@ public class ReceivingLink_1_0 implement
         else
         {
             _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
-            _messageDeliveryTag = xfr.getDeliveryTag();
-            fragments = Collections.singletonList(xfr.getPayload());
+            _messageDeliveryTag = deliveryTag;
+            fragments = Collections.singletonList(xfr.getPayload().duplicate());
+            xfr.dispose();
         }
 
         if(_resumedMessage)
@@ -151,19 +154,25 @@ public class ReceivingLink_1_0 implement
             mmd = new MessageMetaData_1_0(fragments.toArray(new QpidByteBuffer[fragments.size()]),
                     _sectionDecoder,
                     immutableSections);
-
             MessageHandle<MessageMetaData_1_0> handle = _vhost.getMessageStore().addMessage(mmd);
 
             for(QpidByteBuffer bareMessageBuf : immutableSections)
             {
-                handle.addContent(bareMessageBuf.duplicate());
+                handle.addContent(bareMessageBuf);
             }
             final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
-            Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference());
+            Message_1_0 message = new Message_1_0(storedMessage, getSession().getConnection().getReference());
+
+
+            for(QpidByteBuffer fragment: fragments)
+            {
+                fragment.dispose();
+            }
+            fragments = null;
+
             MessageReference<Message_1_0> reference = message.newReference();
 
             Binary transactionId = null;
-            org.apache.qpid.amqp_1_0.type.DeliveryState xfrState = xfr.getState();
             if(xfrState != null)
             {
                 if(xfrState instanceof TransactionalState)
@@ -203,8 +212,6 @@ public class ReceivingLink_1_0 implement
 
             boolean settled = transaction instanceof AutoCommitTransaction && ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode());
 
-            final Binary deliveryTag = xfr.getDeliveryTag();
-
             if(!settled)
             {
                 _unsettledMap.put(deliveryTag, outcome);

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java Tue Nov 10 17:20:00 2015
@@ -34,7 +34,7 @@ import org.apache.qpid.amqp_1_0.transpor
 import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
 import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.Section;
 import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
 import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
@@ -80,6 +80,7 @@ public class TxnCoordinatorLink_1_0 impl
 
         QpidByteBuffer payload = null;
 
+        final Binary deliveryTag = xfr.getDeliveryTag();
 
         if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
         {
@@ -104,6 +105,7 @@ public class TxnCoordinatorLink_1_0 impl
             for(Transfer t : _incompleteMessage)
             {
                 payload.put(t.getPayload().duplicate());
+                t.dispose();
             }
             payload.flip();
             _incompleteMessage=null;
@@ -111,10 +113,10 @@ public class TxnCoordinatorLink_1_0 impl
         }
         else
         {
-            payload = xfr.getPayload();
+            payload = xfr.getPayload().duplicate();
+            xfr.dispose();
         }
 
-
         // Only interested int he amqp-value section that holds the message to the coordinator
         try
         {
@@ -126,6 +128,7 @@ public class TxnCoordinatorLink_1_0 impl
                 {
                     Object command = ((AmqpValue) section).getValue();
 
+
                     if(command instanceof Declare)
                     {
                         Integer txnId = Integer.valueOf(0);
@@ -143,16 +146,15 @@ public class TxnCoordinatorLink_1_0 impl
 
 
                         state.setTxnId(_session.integerToBinary(txnId));
-                        _endpoint.updateDisposition(xfr.getDeliveryTag(), state, true);
+                        _endpoint.updateDisposition(deliveryTag, state, true);
 
                     }
                     else if(command instanceof Discharge)
                     {
                         Discharge discharge = (Discharge) command;
 
-                        DeliveryState state = xfr.getState();
                         discharge(_session.binaryToInteger(discharge.getTxnId()), discharge.getFail());
-                        _endpoint.updateDisposition(xfr.getDeliveryTag(), new Accepted(), true);
+                        _endpoint.updateDisposition(deliveryTag, new Accepted(), true);
 
                     }
                 }
@@ -164,6 +166,13 @@ public class TxnCoordinatorLink_1_0 impl
             //TODO
             _logger.error("AMQP error", e);
         }
+        finally
+        {
+            if (payload != null)
+            {
+                payload.dispose();
+            }
+        }
 
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org