You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Stirling Chow (JIRA)" <ji...@apache.org> on 2012/10/19 02:48:03 UTC

[jira] [Updated] (AMQ-4116) Memory usage is not cleared from the source queue when a message is moved to another queue over the VMTransport

     [ https://issues.apache.org/jira/browse/AMQ-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-4116:
-------------------------------

    Description: 
Reproduction
============
Using VMTransport:

1. Produce a message on queue A and verify that queue A's memory usage increases
2. Consume the message from queue A and verify that queue A's memory usage decreases.
3. Resend the message to queue B.

Expected: Queue A's memory usage is not increased by the enqueue to queue B.
Actual: Queue A's memory usage increases and no memory usage increase occurs on queue B.

Symptom
=======
When messages are moved between queues using the VMTransport, they continue to contribute to the memory usage of the source queue rather than the destination queue.

The correct behaviour (memory usage decreases from queue A and increases in queue B) is exhibited by non-VMTransport (e.g., TCP).

Cause
=====
When the message is first sent to queue A, it's memoryUsage field is set to match queue A's:

{code:title=org.apache.activemq.broker.region.Queue}
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
    final ConnectionContext context = producerExchange.getConnectionContext();
    // There is delay between the client sending it and it arriving at the
    // destination.. it may have expired.
    message.setRegionDestination(this);
...
{code}

{code:title=org.apache.activemq.command.Message}
public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) {
    this.regionDestination = destination;
    if(this.memoryUsage==null) {
        this.memoryUsage=regionDestination.getMemoryUsage();
    }
}
{code}

As the message moves across the transport, it is copied along with the memoryUsage field:

{code:title=org.apache.activemq.command.Message}
protected void copy(Message copy) {
    super.copy(copy);
...
    copy.memoryUsage=this.memoryUsage;
...
{code}

When the message is sent to the second queue, memoryUsage is non-null, so setRegionDestination(...) does not update memoryUsage to reflect the new destination queue.

When the destination queue accepts the message, the memoryUsage of the source queue is (incorrectly) increased:

{code:title=org.apache.activemq.command.Message}
public int incrementReferenceCount() {
    int rc;
    int size;
    synchronized (this) {
        rc = ++referenceCount;
        size = getSize();
    }

    if (rc == 1 && getMemoryUsage() != null) {
        getMemoryUsage().increaseUsage(size);
{code}

This mal-behaviour is not exhibited by other transports since they serialize Message and memoryUsage is transient.  As a result, the call to setRegionDestination(...) will properly update memoryUsage when the message arrives at the destination queue.

Solution
========
There are a number of possible solutions, any of which would correct the behaviour (although I am unsure what side-effects they may have on other behaviour):

1. It seems odd that memoryUsage is copied when Message is copied.  If Message.copy(...) is used as a shortcut to avoid serialization/deserialization on VMTransport, then it should have the same semantics and avoid copying transient fields.

2. It seems odd that setRegionDestination(...) would not always set the memoryUsage to match the destination's memoryUsage.

3. ActiveMQConnection has a comment regarding concessions made for messages transmitted by the VM transport:

{code:title=org.apache.activemq.ActiveMQConnection}
public void onCommand(final Object o) {
    final Command command = (Command)o;
    if (!closed.get() && command != null) {
        try {
            command.visit(new CommandVisitorAdapter() {
                @Override
                public Response processMessageDispatch(MessageDispatch md) throws Exception {
                    waitForTransportInterruptionProcessingToComplete();
                    ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
                    if (dispatcher != null) {
                        // Copy in case a embedded broker is dispatching via
                        // vm://
                        // md.getMessage() == null to signal end of queue
                        // browse.
                        Message msg = md.getMessage();
                        if (msg != null) {
                            msg = msg.copy();
                            msg.setReadOnlyBody(true);
                            msg.setReadOnlyProperties(true);
                            msg.setRedeliveryCounter(md.getRedeliveryCounter());
                            msg.setConnection(ActiveMQConnection.this);
                            md.setMessage(msg);
                        }
                        dispatcher.dispatch(md);
                    }
                    return null;
                }
{code}

Adding a call to msg.setMemoryUsage(null) would address this bug.

The latter appears to be the least intrusive, although it will only address the case of VMTransport messages moving between producers/consumers. Queue contains shortcut methods for moving messages between queues (e.g., copyMessageTo).  I have not verified if these methods exhibit the same behaviour re: memory usage, but if so, they would not be addressed by patching ActiveMQConnection.

Our main concern is with the reported use case, so I've attached a patch for ActiveMQConnection and unit test to demonstrate the behaviour.

  was:
Reproduction
============
Using VMTransport:

1. Produce a message on queue A and verify that queue A's memory usage increases
2. Consume the message from queue A and verify that queue A's memory usage decreases.
3. Resend the message to queue B.

Expected: Queue A's memory usage is not increased by the enqueue to queue B.
Actual: Queue A's memory usage increases and no memory usage increase occurs on queue B.

Symptom
=======
When messages are moved between queues using the VMTransport, they continue to contribute to the memory usage of the source queue rather than the destination queue.

The correct behaviour (memory usage decreases from queue A and increases in queue B) is exhibited by non-VMTransport (e.g., TCP).

Cause
=====
When the message is first sent to queue A, it's memoryUsage field is set to match queue A's:

{code:title=org.apache.activemq.broker.region.Queue}
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
    final ConnectionContext context = producerExchange.getConnectionContext();
    // There is delay between the client sending it and it arriving at the
    // destination.. it may have expired.
    message.setRegionDestination(this);
...
{code}

{code:title=org.apache.activemq.command.Message}
public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) {
    this.regionDestination = destination;
    if(this.memoryUsage==null) {
        this.memoryUsage=regionDestination.getMemoryUsage();
    }
}
{code}

As the message moves across the transport, it is copied along with the memoryUsage field:

{code:title=org.apache.activemq.command.Message}
protected void copy(Message copy) {
    super.copy(copy);
...
    copy.memoryUsage=this.memoryUsage;
...
{code}

When the message is sent to the second queue, memoryUsage is non-null, so setRegionDestination(...) does not update memoryUsage to reflect the new destination queue.

When the destination queue accepts the message, the memoryUsage of the source queue is (incorrectly) increased:

{code:title=org.apache.activemq.command.Message}
public int incrementReferenceCount() {
    int rc;
    int size;
    synchronized (this) {
        rc = ++referenceCount;
        size = getSize();
    }

    if (rc == 1 && getMemoryUsage() != null) {
        getMemoryUsage().increaseUsage(size);
{code}

This mal-behaviour is not exhibited by other transports since they serialize Message and memoryUsage is transient.  As a result, the call to setRegionDestination(...) will properly update memoryUsage when the message arrives at the destination queue.

Solution
========
There are a number of possible solutions, any of which would correct the behaviour (although I am unsure what side-effects they may have on other behaviour):

1. It seems odd that memoryUsage is copied when Message is copied.  If Message.copy(...) is used as a shortcut to avoid serialization/deserialization on VMTransport, then it should have the same semantics and avoid copying transient fields.

2. It seems odd that setRegionDestination(...) would not always set the memoryUsage to match the destination's memoryUsage.

3. ActiveMQConnection has a comment regarding concessions made for messages transmitted by the VM transport:

{code:org.apache.activemq.ActiveMQConnection}
public void onCommand(final Object o) {
    final Command command = (Command)o;
    if (!closed.get() && command != null) {
        try {
            command.visit(new CommandVisitorAdapter() {
                @Override
                public Response processMessageDispatch(MessageDispatch md) throws Exception {
                    waitForTransportInterruptionProcessingToComplete();
                    ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
                    if (dispatcher != null) {
                        // Copy in case a embedded broker is dispatching via
                        // vm://
                        // md.getMessage() == null to signal end of queue
                        // browse.
                        Message msg = md.getMessage();
                        if (msg != null) {
                            msg = msg.copy();
                            msg.setReadOnlyBody(true);
                            msg.setReadOnlyProperties(true);
                            msg.setRedeliveryCounter(md.getRedeliveryCounter());
                            msg.setConnection(ActiveMQConnection.this);
                            md.setMessage(msg);
                        }
                        dispatcher.dispatch(md);
                    }
                    return null;
                }
{code}

Adding a call to msg.setMemoryUsage(null) would address this bug.

The latter appears to be the least intrusive, although it will only address the case of VMTransport messages moving between producers/consumers. Queue contains shortcut methods for moving messages between queues (e.g., copyMessageTo).  I have not verified if these methods exhibit the same behaviour re: memory usage, but if so, they would not be addressed by patching ActiveMQConnection.

Our main concern is with the reported use case, so I've attached a patch for ActiveMQConnection and unit test to demonstrate the behaviour.

    
> Memory usage is not cleared from the source queue when a message is moved to another queue over the VMTransport
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4116
>                 URL: https://issues.apache.org/jira/browse/AMQ-4116
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.7.0
>            Reporter: Stirling Chow
>
> Reproduction
> ============
> Using VMTransport:
> 1. Produce a message on queue A and verify that queue A's memory usage increases
> 2. Consume the message from queue A and verify that queue A's memory usage decreases.
> 3. Resend the message to queue B.
> Expected: Queue A's memory usage is not increased by the enqueue to queue B.
> Actual: Queue A's memory usage increases and no memory usage increase occurs on queue B.
> Symptom
> =======
> When messages are moved between queues using the VMTransport, they continue to contribute to the memory usage of the source queue rather than the destination queue.
> The correct behaviour (memory usage decreases from queue A and increases in queue B) is exhibited by non-VMTransport (e.g., TCP).
> Cause
> =====
> When the message is first sent to queue A, it's memoryUsage field is set to match queue A's:
> {code:title=org.apache.activemq.broker.region.Queue}
> public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
>     final ConnectionContext context = producerExchange.getConnectionContext();
>     // There is delay between the client sending it and it arriving at the
>     // destination.. it may have expired.
>     message.setRegionDestination(this);
> ...
> {code}
> {code:title=org.apache.activemq.command.Message}
> public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) {
>     this.regionDestination = destination;
>     if(this.memoryUsage==null) {
>         this.memoryUsage=regionDestination.getMemoryUsage();
>     }
> }
> {code}
> As the message moves across the transport, it is copied along with the memoryUsage field:
> {code:title=org.apache.activemq.command.Message}
> protected void copy(Message copy) {
>     super.copy(copy);
> ...
>     copy.memoryUsage=this.memoryUsage;
> ...
> {code}
> When the message is sent to the second queue, memoryUsage is non-null, so setRegionDestination(...) does not update memoryUsage to reflect the new destination queue.
> When the destination queue accepts the message, the memoryUsage of the source queue is (incorrectly) increased:
> {code:title=org.apache.activemq.command.Message}
> public int incrementReferenceCount() {
>     int rc;
>     int size;
>     synchronized (this) {
>         rc = ++referenceCount;
>         size = getSize();
>     }
>     if (rc == 1 && getMemoryUsage() != null) {
>         getMemoryUsage().increaseUsage(size);
> {code}
> This mal-behaviour is not exhibited by other transports since they serialize Message and memoryUsage is transient.  As a result, the call to setRegionDestination(...) will properly update memoryUsage when the message arrives at the destination queue.
> Solution
> ========
> There are a number of possible solutions, any of which would correct the behaviour (although I am unsure what side-effects they may have on other behaviour):
> 1. It seems odd that memoryUsage is copied when Message is copied.  If Message.copy(...) is used as a shortcut to avoid serialization/deserialization on VMTransport, then it should have the same semantics and avoid copying transient fields.
> 2. It seems odd that setRegionDestination(...) would not always set the memoryUsage to match the destination's memoryUsage.
> 3. ActiveMQConnection has a comment regarding concessions made for messages transmitted by the VM transport:
> {code:title=org.apache.activemq.ActiveMQConnection}
> public void onCommand(final Object o) {
>     final Command command = (Command)o;
>     if (!closed.get() && command != null) {
>         try {
>             command.visit(new CommandVisitorAdapter() {
>                 @Override
>                 public Response processMessageDispatch(MessageDispatch md) throws Exception {
>                     waitForTransportInterruptionProcessingToComplete();
>                     ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
>                     if (dispatcher != null) {
>                         // Copy in case a embedded broker is dispatching via
>                         // vm://
>                         // md.getMessage() == null to signal end of queue
>                         // browse.
>                         Message msg = md.getMessage();
>                         if (msg != null) {
>                             msg = msg.copy();
>                             msg.setReadOnlyBody(true);
>                             msg.setReadOnlyProperties(true);
>                             msg.setRedeliveryCounter(md.getRedeliveryCounter());
>                             msg.setConnection(ActiveMQConnection.this);
>                             md.setMessage(msg);
>                         }
>                         dispatcher.dispatch(md);
>                     }
>                     return null;
>                 }
> {code}
> Adding a call to msg.setMemoryUsage(null) would address this bug.
> The latter appears to be the least intrusive, although it will only address the case of VMTransport messages moving between producers/consumers. Queue contains shortcut methods for moving messages between queues (e.g., copyMessageTo).  I have not verified if these methods exhibit the same behaviour re: memory usage, but if so, they would not be addressed by patching ActiveMQConnection.
> Our main concern is with the reported use case, so I've attached a patch for ActiveMQConnection and unit test to demonstrate the behaviour.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira