You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/11/10 18:20:01 UTC
svn commit: r1713702 - in
/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid:
amqp_1_0/codec/ amqp_1_0/framing/ amqp_1_0/transport/
amqp_1_0/type/transport/ server/protocol/v1_0/
Author: orudyy
Date: Tue Nov 10 17:20:00 2015
New Revision: 1713702
URL: http://svn.apache.org/viewvc?rev=1713702&view=rev
Log:
QPID-6832: Resolve memory leaks for AMQP 1.0 support in Broker
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Transfer.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java Tue Nov 10 17:20:00 2015
@@ -41,20 +41,29 @@ public abstract class ArrayTypeConstruct
size, in.remaining());
}
QpidByteBuffer dup = in.slice();
- dup.limit(size);
- in.position(in.position()+size);
- int count = read(dup);
- TypeConstructor t = handler.readConstructor(dup);
- List rval = new ArrayList(count);
- for(int i = 0; i < count; i++)
+
+ List rval;
+ try
{
- rval.add(t.construct(dup, handler));
+ dup.limit(size);
+ in.position(in.position()+size);
+ int count = read(dup);
+ TypeConstructor t = handler.readConstructor(dup);
+ rval = new ArrayList(count);
+ for(int i = 0; i < count; i++)
+ {
+ rval.add(t.construct(dup, handler));
+ }
+ if(dup.hasRemaining())
+ {
+ throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+ "Array incorrectly encoded, %d bytes remaining after decoding %d elements",
+ dup.remaining(), count);
+ }
}
- if(dup.hasRemaining())
+ finally
{
- throw new AmqpErrorException(AmqpError.DECODE_ERROR,
- "Array incorrectly encoded, %d bytes remaining after decoding %d elements",
- dup.remaining(), count);
+ dup.dispose();
}
if(rval.size() == 0)
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java Tue Nov 10 17:20:00 2015
@@ -56,26 +56,9 @@ public class BinaryTypeConstructor exten
size = in.getInt();
}
- QpidByteBuffer inDup = in.slice();
- inDup.limit(inDup.position()+size);
-
- Binary binary;
-/* if(isCopy && inDup.hasArray())
- {
- binary= new Binary(inDup.array(), inDup.arrayOffset()+inDup.position(),size);
- }
- else
- {*/
- byte[] buf = new byte[size];
- inDup.get(buf);
- binary = new Binary(buf);
- /* }*/
-
- in.position(in.position()+size);
-
-
- return binary;
-
+ byte[] buf = new byte[size];
+ in.get(buf);
+ return new Binary(buf);
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java Tue Nov 10 17:20:00 2015
@@ -171,10 +171,6 @@ public class CompoundTypeConstructor ext
count = in.getInt();
}
- QpidByteBuffer inDup = in.slice();
-
- inDup.limit(size-getSize());
-
CompoundTypeAssembler assembler = _assemblerFactory.newInstance();
assembler.init(count);
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java Tue Nov 10 17:20:00 2015
@@ -215,8 +215,6 @@ public abstract class CompoundWriter<V>
{
State state = State.FORMAT_CODE;
- /*ByteBuffer origBuffer = buffer;
- buffer = buffer.duplicate();*/
int origPosition = buffer.position();
int length ;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java Tue Nov 10 17:20:00 2015
@@ -84,10 +84,12 @@ public class FrameWriter implements Valu
{
_typeWriter.setValue(_frame.getFrameBody());
+ QpidByteBuffer qpidByteBuffer = remaining > 8
+ ? buffer.duplicate().position(buffer.position() + 8)
+ : QpidByteBuffer.wrap(EMPTY_BYTE_ARRAY);
- _size = _typeWriter.writeToBuffer(remaining > 8
- ? buffer.duplicate().position(buffer.position()+8)
- : QpidByteBuffer.wrap(EMPTY_BYTE_ARRAY)) + 8 + payloadLength;
+ _size = _typeWriter.writeToBuffer(qpidByteBuffer) + 8 + payloadLength;
+ qpidByteBuffer.dispose();
}
else
{
@@ -118,6 +120,7 @@ public class FrameWriter implements Valu
int payloadUsed = buffer.remaining();
dup.limit(payloadUsed);
buffer.put(dup);
+ dup.dispose();
_payload.position(_payload.position()+payloadUsed);
}
_state = State.PAYLOAD;
@@ -129,7 +132,17 @@ public class FrameWriter implements Valu
if(payloadLength > 0)
{
buffer.put(_payload);
+
+ }
+
+ if (_payload != null)
+ {
+ _payload.dispose();
+ _payload = null;
}
+
+ _frame = null;
+ _typeWriter = null;
_state = State.DONE;
}
@@ -217,6 +230,10 @@ public class FrameWriter implements Valu
_state = State.DONE;
_frame = null;
_typeWriter = null;
+ if (_payload != null)
+ {
+ _payload.dispose();
+ }
_payload = null;
}
@@ -228,12 +245,14 @@ public class FrameWriter implements Valu
_state = State.DONE;
_frame = null;
_typeWriter = null;
+ _payload.dispose();
_payload = null;
}
}
}
}
+
if(_size == -1)
{
_size = _typeWriter.writeToBuffer(QpidByteBuffer.wrap(EMPTY_BYTE_ARRAY)) + 8 + (_payload == null ? 0 : _payload.remaining());
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java Tue Nov 10 17:20:00 2015
@@ -64,19 +64,22 @@ public class StringTypeConstructor exten
try
{
dup.limit(dup.position()+size);
+ CharBuffer charBuf = dup.decode(_charSet);
+
+ String str = charBuf.toString();
+
+ in.position(origPosition + size);
+
+ return str;
}
catch(IllegalArgumentException e)
{
throw new IllegalArgumentException("position: " + dup.position() + "size: " + size + " capacity: " + dup.capacity());
}
- CharBuffer charBuf = dup.decode(_charSet);
-
- String str = charBuf.toString();
-
- in.position(origPosition+size);
-
- return str;
-
+ finally
+ {
+ dup.dispose();
+ }
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java Tue Nov 10 17:20:00 2015
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.amqp_1_0.codec;
-import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.concurrent.ConcurrentHashMap;
@@ -70,7 +69,9 @@ public class SymbolTypeConstructor exten
else
{
byte[] b = new byte[in.remaining()];
- in.duplicate().get(b);
+ QpidByteBuffer dup = in.duplicate();
+ dup.get(b);
+ dup.dispose();
binaryStr = new BinaryString(b, 0, b.length);
}
@@ -80,13 +81,10 @@ public class SymbolTypeConstructor exten
QpidByteBuffer dup = in.duplicate();
dup.limit(in.position()+size);
CharBuffer charBuf = dup.decode(ASCII);
-
+ dup.dispose();
symbolVal = Symbol.getSymbol(charBuf.toString());
-
-
-
byte[] data = new byte[size];
in.get(data);
binaryStr = new BinaryString(data, 0, size);
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java Tue Nov 10 17:20:00 2015
@@ -144,7 +144,8 @@ public class FrameHandler implements Pro
case BUFFERING:
if(_buffer != null)
{
- if(in.remaining() < _buffer.remaining())
+ final int bufferRemaining = _buffer.remaining();
+ if(in.remaining() < bufferRemaining)
{
_buffer.put(in);
break;
@@ -152,12 +153,14 @@ public class FrameHandler implements Pro
else
{
QpidByteBuffer dup = in.duplicate();
- dup.limit(dup.position()+_buffer.remaining());
- int i = _buffer.remaining();
- int d = dup.remaining();
- in.position(in.position()+_buffer.remaining());
+ dup.limit(dup.position() + bufferRemaining);
+
_buffer.put(dup);
+ dup.dispose();
+
+ in.position(in.position() + bufferRemaining);
oldIn = in;
+
_buffer.flip();
in = _buffer;
state = State.PARSING;
@@ -240,14 +243,10 @@ public class FrameHandler implements Pro
}
_connection.receive((short)channel,val);
+
reset();
- in = oldIn;
- oldIn = null;
- _buffer = null;
state = State.SIZE_0;
break;
-
-
}
catch (AmqpErrorException ex)
{
@@ -260,6 +259,14 @@ public class FrameHandler implements Pro
in.limit(inLimit);
throw e;
}
+ finally
+ {
+ in.dispose();
+
+ in = oldIn;
+ oldIn = null;
+ _buffer = null;
+ }
}
}
@@ -270,6 +277,12 @@ public class FrameHandler implements Pro
if(_state == State.ERROR)
{
_connection.handleError(frameParsingError);
+
+ if (_buffer != null)
+ {
+ _buffer.dispose();
+ _buffer = null;
+ }
}
}
catch(RuntimeException e)
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java Tue Nov 10 17:20:00 2015
@@ -150,10 +150,10 @@ public class SASLFrameHandler implements
{
QpidByteBuffer dup = in.duplicate();
dup.limit(dup.position()+_buffer.remaining());
- int i = _buffer.remaining();
- int d = dup.remaining();
in.position(in.position()+_buffer.remaining());
_buffer.put(dup);
+ dup.dispose();
+
oldIn = in;
_buffer.flip();
in = _buffer;
@@ -239,9 +239,6 @@ public class SASLFrameHandler implements
{
_connection.receive((short)channel,val);
reset();
- in = oldIn;
- oldIn = null;
- _buffer = null;
state = State.SIZE_0;
break;
}
@@ -253,6 +250,14 @@ public class SASLFrameHandler implements
state = State.ERROR;
frameParsingError = ex.getError();
}
+ finally
+ {
+ in.dispose();
+
+ in = oldIn;
+ oldIn = null;
+ _buffer = null;
+ }
}
}
@@ -263,6 +268,11 @@ public class SASLFrameHandler implements
if(_state == State.ERROR)
{
_connection.handleError(frameParsingError);
+ if (_buffer != null)
+ {
+ _buffer.dispose();
+ _buffer = null;
+ }
}
if(_connection.isSASLComplete())
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java Tue Nov 10 17:20:00 2015
@@ -695,32 +695,35 @@ public class ConnectionEndpoint implemen
int size = writer.writeToBuffer(EMPTY_BYTE_BUFFER);
QpidByteBuffer payloadDup = payload == null ? null : payload.duplicate();
int payloadSent = getMaxFrameSize() - (size + 9);
- if (payloadSent < (payload == null ? 0 : payload.remaining()))
+ try
{
-
- if (body instanceof Transfer)
+ if (payloadSent < (payload == null ? 0 : payload.remaining()))
{
- ((Transfer) body).setMore(Boolean.TRUE);
- }
- writer = _describedTypeRegistry.getValueWriter(body);
- size = writer.writeToBuffer(EMPTY_BYTE_BUFFER);
- payloadSent = getMaxFrameSize() - (size + 9);
+ if (body instanceof Transfer)
+ {
+ ((Transfer) body).setMore(Boolean.TRUE);
+ }
+
+ writer = _describedTypeRegistry.getValueWriter(body);
+ size = writer.writeToBuffer(EMPTY_BYTE_BUFFER);
+ payloadSent = getMaxFrameSize() - (size + 9);
- try
- {
payloadDup.limit(payloadDup.position() + payloadSent);
}
- catch (NullPointerException npe)
+ else
{
- throw npe;
+ payloadSent = payload == null ? 0 : payload.remaining();
}
+ _frameOutputHandler.send(AMQFrame.createAMQFrame(channel, body, payloadDup));
}
- else
+ finally
{
- payloadSent = payload == null ? 0 : payload.remaining();
+ if (payloadDup != null)
+ {
+ payloadDup.dispose();
+ }
}
- _frameOutputHandler.send(AMQFrame.createAMQFrame(channel, body, payloadDup));
return payloadSent;
}
else
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java Tue Nov 10 17:20:00 2015
@@ -644,18 +644,25 @@ public class SessionEndpoint
if(payload != null && payloadSent < payload.remaining() && payloadSent >= 0)
{
payload = payload.duplicate();
- payload.position(payload.position()+payloadSent);
+ try
+ {
+ payload.position(payload.position()+payloadSent);
- Transfer secondTransfer = new Transfer();
+ Transfer secondTransfer = new Transfer();
- secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
- secondTransfer.setHandle(xfr.getHandle());
- secondTransfer.setSettled(xfr.getSettled());
- secondTransfer.setState(xfr.getState());
- secondTransfer.setMessageFormat(xfr.getMessageFormat());
- secondTransfer.setPayload(payload);
+ secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
+ secondTransfer.setHandle(xfr.getHandle());
+ secondTransfer.setSettled(xfr.getSettled());
+ secondTransfer.setState(xfr.getState());
+ secondTransfer.setMessageFormat(xfr.getMessageFormat());
+ secondTransfer.setPayload(payload);
- sendTransfer(secondTransfer, endpoint, false);
+ sendTransfer(secondTransfer, endpoint, false);
+ }
+ finally
+ {
+ payload.dispose();
+ }
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Transfer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Transfer.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Transfer.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Transfer.java Tue Nov 10 17:20:00 2015
@@ -38,7 +38,7 @@ public class Transfer
{
- private QpidByteBuffer _payload;
+ private volatile QpidByteBuffer _payload;
private UnsignedInteger _handle;
@@ -296,5 +296,12 @@ public class Transfer
return _payload;
}
-
+ public void dispose()
+ {
+ if (_payload != null)
+ {
+ _payload.dispose();
+ _payload = null;
+ }
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Tue Nov 10 17:20:00 2015
@@ -315,6 +315,7 @@ public class AMQPConnection_1_0 extends
QpidByteBuffer dup = msg.duplicate();
byte[] data = new byte[dup.remaining()];
dup.get(data);
+ dup.dispose();
Binary bin = new Binary(data);
RAW_LOGGER.debug("RECV[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
}
@@ -497,29 +498,35 @@ public class AMQPConnection_1_0 extends
_frameWriter.setValue(amqFrame);
- QpidByteBuffer dup = QpidByteBuffer.allocateDirect(_endpoint.getMaxFrameSize());
+ QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(_endpoint.getMaxFrameSize());
- int size = _frameWriter.writeToBuffer(dup);
- if (size > _endpoint.getMaxFrameSize())
+ try
{
- throw new OversizeFrameException(amqFrame, size);
- }
+ int size = _frameWriter.writeToBuffer(buffer);
+ if (size > _endpoint.getMaxFrameSize())
+ {
+ throw new OversizeFrameException(amqFrame, size);
+ }
- dup.flip();
+ buffer.flip();
+
+ if (RAW_LOGGER.isDebugEnabled())
+ {
+ QpidByteBuffer dup = buffer.duplicate();
+ byte[] data = new byte[dup.remaining()];
+ dup.get(data);
+ dup.dispose();
+ Binary bin = new Binary(data);
+ RAW_LOGGER.debug("SEND[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
+ }
- if (RAW_LOGGER.isDebugEnabled())
+ getSender().send(buffer);
+ getSender().flush();
+ }
+ finally
{
- QpidByteBuffer dup2 = dup.duplicate();
- byte[] data = new byte[dup2.remaining()];
- dup2.get(data);
- Binary bin = new Binary(data);
- RAW_LOGGER.debug("SEND[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
+ buffer.dispose();
}
-
- getSender().send(dup);
- getSender().flush();
-
-
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Tue Nov 10 17:20:00 2015
@@ -137,141 +137,146 @@ class ConsumerTarget_1_0 extends Abstrac
}
Transfer transfer = new Transfer();
- //TODO
-
- Collection<QpidByteBuffer> fragments = message.getFragments();
- QpidByteBuffer payload;
- if(fragments.size() == 1)
- {
- payload = fragments.iterator().next();
- }
- else
+ try
{
- int size = 0;
- for(QpidByteBuffer fragment : fragments)
+ QpidByteBuffer payload = null;
+ //TODO
+ Collection<QpidByteBuffer> fragments = message.getFragments();
+ if(fragments.size() == 1)
{
- size += fragment.remaining();
+ payload = fragments.iterator().next();
}
-
- payload = QpidByteBuffer.allocateDirect(size);
-
- for(QpidByteBuffer fragment : fragments)
+ else
{
- payload.put(fragment.duplicate());
- }
+ int size = 0;
+ for(QpidByteBuffer fragment : fragments)
+ {
+ size += fragment.remaining();
+ }
- payload.flip();
- }
+ payload = QpidByteBuffer.allocateDirect(size);
- if(entry.getDeliveryCount() != 0)
- {
- payload = payload.duplicate();
- ValueHandler valueHandler = new ValueHandler(_typeRegistry);
+ for(QpidByteBuffer fragment : fragments)
+ {
+ payload.put(fragment);
+ fragment.dispose();
+ }
+
+ payload.flip();
+ }
- Header oldHeader = null;
- try
+ if(entry.getDeliveryCount() != 0)
{
- Object value = valueHandler.parse(payload);
- if(value instanceof Header)
+ ValueHandler valueHandler = new ValueHandler(_typeRegistry);
+
+ Header oldHeader = null;
+ try
{
- oldHeader = (Header) value;
+ Object value = valueHandler.parse(payload);
+ if(value instanceof Header)
+ {
+ oldHeader = (Header) value;
+ }
+ else
+ {
+ payload.position(0);
+ }
}
- else
+ catch (AmqpErrorException e)
{
- payload.position(0);
+ //TODO
+ throw new ConnectionScopedRuntimeException(e);
}
- }
- catch (AmqpErrorException e)
- {
- //TODO
- throw new ConnectionScopedRuntimeException(e);
- }
- Header header = new Header();
- if(oldHeader != null)
- {
- header.setDurable(oldHeader.getDurable());
- header.setPriority(oldHeader.getPriority());
- header.setTtl(oldHeader.getTtl());
+ Header header = new Header();
+ if(oldHeader != null)
+ {
+ header.setDurable(oldHeader.getDurable());
+ header.setPriority(oldHeader.getPriority());
+ header.setTtl(oldHeader.getTtl());
+ }
+ header.setDeliveryCount(UnsignedInteger.valueOf(entry.getDeliveryCount()));
+ _sectionEncoder.reset();
+ _sectionEncoder.encodeObject(header);
+ Binary encodedHeader = _sectionEncoder.getEncoding();
+
+ QpidByteBuffer oldPayload = payload;
+ payload = QpidByteBuffer.allocateDirect(oldPayload.remaining() + encodedHeader.getLength());
+ payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength());
+ payload.put(oldPayload);
+ oldPayload.dispose();
+ payload.flip();
}
- header.setDeliveryCount(UnsignedInteger.valueOf(entry.getDeliveryCount()));
- _sectionEncoder.reset();
- _sectionEncoder.encodeObject(header);
- Binary encodedHeader = _sectionEncoder.getEncoding();
-
- QpidByteBuffer oldPayload = payload;
- payload = QpidByteBuffer.allocateDirect(oldPayload.remaining() + encodedHeader.getLength());
- payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength());
- payload.put(oldPayload);
- payload.flip();
- }
- transfer.setPayload(payload);
- byte[] data = new byte[8];
- ByteBuffer.wrap(data).putLong(_deliveryTag++);
- final Binary tag = new Binary(data);
+ transfer.setPayload(payload);
+ byte[] data = new byte[8];
+ ByteBuffer.wrap(data).putLong(_deliveryTag++);
+ final Binary tag = new Binary(data);
- transfer.setDeliveryTag(tag);
+ transfer.setDeliveryTag(tag);
- synchronized(_link.getLock())
- {
- if(_link.isAttached())
+ synchronized(_link.getLock())
{
- if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
+ if(_link.isAttached())
{
- transfer.setSettled(true);
- }
- else
- {
- UnsettledAction action = _acquires
- ? new DispositionAction(tag, entry)
- : new DoNothingAction(tag, entry);
+ if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
+ {
+ transfer.setSettled(true);
+ }
+ else
+ {
+ UnsettledAction action = _acquires
+ ? new DispositionAction(tag, entry)
+ : new DoNothingAction(tag, entry);
- _link.addUnsettled(tag, action, entry);
- }
+ _link.addUnsettled(tag, action, entry);
+ }
- if(_transactionId != null)
- {
- TransactionalState state = new TransactionalState();
- state.setTxnId(_transactionId);
- transfer.setState(state);
- }
- // TODO - need to deal with failure here
- if(_acquires && _transactionId != null)
- {
- ServerTransaction txn = _link.getTransaction(_transactionId);
- if(txn != null)
+ if(_transactionId != null)
+ {
+ TransactionalState state = new TransactionalState();
+ state.setTxnId(_transactionId);
+ transfer.setState(state);
+ }
+ // TODO - need to deal with failure here
+ if(_acquires && _transactionId != null)
{
- txn.addPostTransactionAction(new ServerTransaction.Action(){
+ ServerTransaction txn = _link.getTransaction(_transactionId);
+ if(txn != null)
+ {
+ txn.addPostTransactionAction(new ServerTransaction.Action(){
- public void postCommit()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
+ public void postCommit()
+ {
+ }
- public void onRollback()
- {
- if(entry.isAcquiredBy(getConsumer()))
+ public void onRollback()
{
- entry.release();
- _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
+ if(entry.isAcquiredBy(getConsumer()))
+ {
+ entry.release();
+ _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
+ }
}
- }
- });
- }
+ });
+ }
+ }
+ getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
+ getEndpoint().transfer(transfer);
+ }
+ else
+ {
+ entry.release();
}
- getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
- getEndpoint().transfer(transfer);
- }
- else
- {
- entry.release();
}
}
-
+ finally
+ {
+ transfer.dispose();
+ }
}
public void flushBatched()
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Tue Nov 10 17:20:00 2015
@@ -70,7 +70,7 @@ public class MessageMetaData_1_0 impleme
private Map _appProperties;
private Map _footer;
- private List<QpidByteBuffer> _encodedSections = new ArrayList<>(3);
+ private volatile List<QpidByteBuffer> _encodedSections = new ArrayList<>(3);
private volatile QpidByteBuffer _encoded;
private MessageHeader_1_0 _messageHeader;
@@ -180,7 +180,9 @@ public class MessageMetaData_1_0 impleme
src = QpidByteBuffer.allocateDirect(size);
for(QpidByteBuffer buf : fragments)
{
- src.put(buf.duplicate());
+ QpidByteBuffer duplicate = buf.duplicate();
+ src.put(duplicate);
+ duplicate.dispose();
}
src.flip();
@@ -273,26 +275,9 @@ public class MessageMetaData_1_0 impleme
}
- int pos = 0;
for(QpidByteBuffer buf : fragments)
{
-/*
- if(pos < startBarePos)
- {
- if(pos + buf.remaining() > startBarePos)
- {
- ByteBuffer dup = buf.duplicate();
- dup.position(dup.position()+startBarePos-pos);
- dup.slice();
- encodedSections.add(dup);
- }
- }
- else
-*/
- {
- encodedSections.add(buf.duplicate());
- }
- pos += buf.remaining();
+ encodedSections.add(buf.duplicate());
}
return sections;
@@ -302,6 +287,10 @@ public class MessageMetaData_1_0 impleme
_logger.error("Decoding read section error", e);
throw new IllegalArgumentException(e);
}
+ finally
+ {
+ src.dispose();
+ }
}
@@ -329,7 +318,9 @@ public class MessageMetaData_1_0 impleme
for(QpidByteBuffer bin : _encodedSections)
{
- buf.put(bin.duplicate());
+ QpidByteBuffer duplicate = bin.duplicate();
+ buf.put(duplicate);
+ duplicate.dispose();
}
return buf;
@@ -402,6 +393,11 @@ public class MessageMetaData_1_0 impleme
@Override
public void dispose()
{
+ for(QpidByteBuffer bin : _encodedSections)
+ {
+ bin.dispose();
+ }
+ _encodedSections = null;
_encoded.dispose();
_encoded = null;
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Tue Nov 10 17:20:00 2015
@@ -21,7 +21,6 @@
package org.apache.qpid.server.protocol.v1_0;
-import java.lang.ref.SoftReference;
import java.util.Collection;
import java.util.Collections;
@@ -42,7 +41,6 @@ public class Message_1_0 extends Abstrac
.registerSecurityLayer();
public static final MessageMetaData_1_0 DELETED_MESSAGE_METADATA = new MessageMetaData_1_0(Collections.<Section>emptyList(), new SectionEncoderImpl(DESCRIBED_TYPE_REGISTRY));
- private volatile SoftReference<Collection<QpidByteBuffer>> _fragmentsRef;
private long _arrivalTime;
private final long _size;
@@ -50,37 +48,14 @@ public class Message_1_0 extends Abstrac
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
{
super(storedMessage, null);
- final Collection<QpidByteBuffer> fragments = restoreFragments(getStoredMessage());
- _fragmentsRef = new SoftReference<>(fragments);
- _size = calculateSize(fragments);
- }
-
- private long calculateSize(final Collection<QpidByteBuffer> fragments)
- {
-
- long size = 0l;
- if(fragments != null)
- {
- for(QpidByteBuffer buf : fragments)
- {
- size += buf.remaining();
- }
- }
- return size;
- }
-
- private static Collection<QpidByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage)
- {
- return storedMessage.getContent();
+ _size = storedMessage.getMetaData().getContentSize();
}
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,
- final Collection<QpidByteBuffer> fragments,
final Object connectionReference)
{
super(storedMessage, connectionReference);
- _fragmentsRef = new SoftReference<>(fragments);
- _size = calculateSize(fragments);
+ _size = storedMessage.getMetaData().getContentSize();
_arrivalTime = System.currentTimeMillis();
}
@@ -125,14 +100,7 @@ public class Message_1_0 extends Abstrac
public Collection<QpidByteBuffer> getFragments()
{
-
- Collection<QpidByteBuffer> fragments = _fragmentsRef.get();
- if(fragments == null)
- {
- fragments = restoreFragments(getStoredMessage());
- _fragmentsRef = new SoftReference<>(fragments);
- }
- return fragments;
+ return getContent();
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Tue Nov 10 17:20:00 2015
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -90,14 +89,15 @@ public class ReceivingLink_1_0 implement
List<QpidByteBuffer> fragments = null;
-
+ org.apache.qpid.amqp_1_0.type.DeliveryState xfrState = xfr.getState();
+ final Binary deliveryTag = xfr.getDeliveryTag();
if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
{
_incompleteMessage = new ArrayList<Transfer>();
_incompleteMessage.add(xfr);
_resumedMessage = Boolean.TRUE.equals(xfr.getResume());
- _messageDeliveryTag = xfr.getDeliveryTag();
+ _messageDeliveryTag = deliveryTag;
return;
}
else if(_incompleteMessage != null)
@@ -110,9 +110,11 @@ public class ReceivingLink_1_0 implement
}
fragments = new ArrayList<QpidByteBuffer>(_incompleteMessage.size());
+
for(Transfer t : _incompleteMessage)
{
- fragments.add(t.getPayload());
+ fragments.add(t.getPayload().duplicate());
+ t.dispose();
}
_incompleteMessage=null;
@@ -120,8 +122,9 @@ public class ReceivingLink_1_0 implement
else
{
_resumedMessage = Boolean.TRUE.equals(xfr.getResume());
- _messageDeliveryTag = xfr.getDeliveryTag();
- fragments = Collections.singletonList(xfr.getPayload());
+ _messageDeliveryTag = deliveryTag;
+ fragments = Collections.singletonList(xfr.getPayload().duplicate());
+ xfr.dispose();
}
if(_resumedMessage)
@@ -151,19 +154,25 @@ public class ReceivingLink_1_0 implement
mmd = new MessageMetaData_1_0(fragments.toArray(new QpidByteBuffer[fragments.size()]),
_sectionDecoder,
immutableSections);
-
MessageHandle<MessageMetaData_1_0> handle = _vhost.getMessageStore().addMessage(mmd);
for(QpidByteBuffer bareMessageBuf : immutableSections)
{
- handle.addContent(bareMessageBuf.duplicate());
+ handle.addContent(bareMessageBuf);
}
final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
- Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference());
+ Message_1_0 message = new Message_1_0(storedMessage, getSession().getConnection().getReference());
+
+
+ for(QpidByteBuffer fragment: fragments)
+ {
+ fragment.dispose();
+ }
+ fragments = null;
+
MessageReference<Message_1_0> reference = message.newReference();
Binary transactionId = null;
- org.apache.qpid.amqp_1_0.type.DeliveryState xfrState = xfr.getState();
if(xfrState != null)
{
if(xfrState instanceof TransactionalState)
@@ -203,8 +212,6 @@ public class ReceivingLink_1_0 implement
boolean settled = transaction instanceof AutoCommitTransaction && ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode());
- final Binary deliveryTag = xfr.getDeliveryTag();
-
if(!settled)
{
_unsettledMap.put(deliveryTag, outcome);
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java?rev=1713702&r1=1713701&r2=1713702&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java Tue Nov 10 17:20:00 2015
@@ -34,7 +34,7 @@ import org.apache.qpid.amqp_1_0.transpor
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.Section;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
@@ -80,6 +80,7 @@ public class TxnCoordinatorLink_1_0 impl
QpidByteBuffer payload = null;
+ final Binary deliveryTag = xfr.getDeliveryTag();
if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
{
@@ -104,6 +105,7 @@ public class TxnCoordinatorLink_1_0 impl
for(Transfer t : _incompleteMessage)
{
payload.put(t.getPayload().duplicate());
+ t.dispose();
}
payload.flip();
_incompleteMessage=null;
@@ -111,10 +113,10 @@ public class TxnCoordinatorLink_1_0 impl
}
else
{
- payload = xfr.getPayload();
+ payload = xfr.getPayload().duplicate();
+ xfr.dispose();
}
-
// Only interested int he amqp-value section that holds the message to the coordinator
try
{
@@ -126,6 +128,7 @@ public class TxnCoordinatorLink_1_0 impl
{
Object command = ((AmqpValue) section).getValue();
+
if(command instanceof Declare)
{
Integer txnId = Integer.valueOf(0);
@@ -143,16 +146,15 @@ public class TxnCoordinatorLink_1_0 impl
state.setTxnId(_session.integerToBinary(txnId));
- _endpoint.updateDisposition(xfr.getDeliveryTag(), state, true);
+ _endpoint.updateDisposition(deliveryTag, state, true);
}
else if(command instanceof Discharge)
{
Discharge discharge = (Discharge) command;
- DeliveryState state = xfr.getState();
discharge(_session.binaryToInteger(discharge.getTxnId()), discharge.getFail());
- _endpoint.updateDisposition(xfr.getDeliveryTag(), new Accepted(), true);
+ _endpoint.updateDisposition(deliveryTag, new Accepted(), true);
}
}
@@ -164,6 +166,13 @@ public class TxnCoordinatorLink_1_0 impl
//TODO
_logger.error("AMQP error", e);
}
+ finally
+ {
+ if (payload != null)
+ {
+ payload.dispose();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org