You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Timothy Bish (JIRA)" <ji...@apache.org> on 2013/01/31 22:47:13 UTC
[jira] [Commented] (AMQ-4116) Memory usage is not cleared from the
source queue when a message is moved to another queue over the VMTransport
[ https://issues.apache.org/jira/browse/AMQ-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13568146#comment-13568146 ]
Timothy Bish commented on AMQ-4116:
-----------------------------------
Agreed that when copied in ActiveMQConnection there's definitely no need for the copy to retain a reference to the MemoryUsage. Will look at AMQ-4147 also but applying this patch won't hurt anything even if we take action on 4147.
> Memory usage is not cleared from the source queue when a message is moved to another queue over the VMTransport
> ---------------------------------------------------------------------------------------------------------------
>
> Key: AMQ-4116
> URL: https://issues.apache.org/jira/browse/AMQ-4116
> Project: ActiveMQ
> Issue Type: Bug
> Components: Transport
> Affects Versions: 5.7.0
> Reporter: Stirling Chow
> Attachments: AMQ4116.patch, AMQ4116Test.java
>
>
> Reproduction
> ============
> Using VMTransport:
> 1. Produce a message on queue A and verify that queue A's memory usage increases
> 2. Consume the message from queue A and verify that queue A's memory usage decreases.
> 3. Resend the message to queue B.
> Expected: Queue A's memory usage is not increased by the enqueue to queue B.
> Actual: Queue A's memory usage increases and no memory usage increase occurs on queue B.
> Symptom
> =======
> When messages are moved between queues using the VMTransport, they continue to contribute to the memory usage of the source queue rather than the destination queue.
> The correct behaviour (memory usage decreases from queue A and increases in queue B) is exhibited by non-VMTransport (e.g., TCP).
> Cause
> =====
> When the message is first sent to queue A, it's memoryUsage field is set to match queue A's:
> {code:title=org.apache.activemq.broker.region.Queue}
> public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
> final ConnectionContext context = producerExchange.getConnectionContext();
> // There is delay between the client sending it and it arriving at the
> // destination.. it may have expired.
> message.setRegionDestination(this);
> ...
> {code}
> {code:title=org.apache.activemq.command.Message}
> public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) {
> this.regionDestination = destination;
> if(this.memoryUsage==null) {
> this.memoryUsage=regionDestination.getMemoryUsage();
> }
> }
> {code}
> As the message moves across the transport, it is copied along with the memoryUsage field:
> {code:title=org.apache.activemq.command.Message}
> protected void copy(Message copy) {
> super.copy(copy);
> ...
> copy.memoryUsage=this.memoryUsage;
> ...
> {code}
> When the message is sent to the second queue, memoryUsage is non-null, so setRegionDestination(...) does not update memoryUsage to reflect the new destination queue.
> When the destination queue accepts the message, the memoryUsage of the source queue is (incorrectly) increased:
> {code:title=org.apache.activemq.command.Message}
> public int incrementReferenceCount() {
> int rc;
> int size;
> synchronized (this) {
> rc = ++referenceCount;
> size = getSize();
> }
> if (rc == 1 && getMemoryUsage() != null) {
> getMemoryUsage().increaseUsage(size);
> {code}
> This mal-behaviour is not exhibited by other transports since they serialize Message and memoryUsage is transient. As a result, the call to setRegionDestination(...) will properly update memoryUsage when the message arrives at the destination queue.
> Solution
> ========
> There are a number of possible solutions, any of which would correct the behaviour (although I am unsure what side-effects they may have on other behaviour):
> 1. It seems odd that memoryUsage is copied when Message is copied. If Message.copy(...) is used as a shortcut to avoid serialization/deserialization on VMTransport, then it should have the same semantics and avoid copying transient fields.
> 2. It seems odd that setRegionDestination(...) would not always set the memoryUsage to match the destination's memoryUsage.
> 3. ActiveMQConnection has a comment regarding concessions made for messages transmitted by the VM transport:
> {code:title=org.apache.activemq.ActiveMQConnection}
> public void onCommand(final Object o) {
> final Command command = (Command)o;
> if (!closed.get() && command != null) {
> try {
> command.visit(new CommandVisitorAdapter() {
> @Override
> public Response processMessageDispatch(MessageDispatch md) throws Exception {
> waitForTransportInterruptionProcessingToComplete();
> ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
> if (dispatcher != null) {
> // Copy in case a embedded broker is dispatching via
> // vm://
> // md.getMessage() == null to signal end of queue
> // browse.
> Message msg = md.getMessage();
> if (msg != null) {
> msg = msg.copy();
> msg.setReadOnlyBody(true);
> msg.setReadOnlyProperties(true);
> msg.setRedeliveryCounter(md.getRedeliveryCounter());
> msg.setConnection(ActiveMQConnection.this);
> md.setMessage(msg);
> }
> dispatcher.dispatch(md);
> }
> return null;
> }
> {code}
> Adding a call to msg.setMemoryUsage(null) would address this bug.
> The latter appears to be the least intrusive, although it will only address the case of VMTransport messages moving between producers/consumers. Queue contains shortcut methods for moving messages between queues (e.g., copyMessageTo). I have not verified if these methods exhibit the same behaviour re: memory usage, but if so, they would not be addressed by patching ActiveMQConnection.
> Our main concern is with the reported use case, so I've attached a patch for ActiveMQConnection and unit test to demonstrate the behaviour.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira