You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Ryan Doyle (JIRA)" <ji...@apache.org> on 2015/08/14 03:13:46 UTC

[jira] [Created] (AMQ-5927) Acknowledged messages marked for redelivery stop new messages being consumed

Ryan Doyle created AMQ-5927:
-------------------------------

             Summary: Acknowledged messages marked for redelivery stop new messages being consumed
                 Key: AMQ-5927
                 URL: https://issues.apache.org/jira/browse/AMQ-5927
             Project: ActiveMQ
          Issue Type: Bug
          Components: Broker
    Affects Versions: 5.11.1
         Environment: Ubuntu 12.04

java version "1.7.0_75"
Java(TM) SE Runtime Environment (build 1.7.0_75-b13)
Java HotSpot(TM) 64-Bit Server VM (build 24.75-b04, mixed mode)
            Reporter: Ryan Doyle
             Fix For: 5.x


Hi

I am getting an issue where consumers are stuck unable to consume messages after an issue with the underlying persistence failing to store a message (EG: out of disk space).

The issue I am seeing is that when a subscription is removed from a queue (the subscription get forcibly removed from the persistence adapter failure), as part of the shutdown, messages are taken off the subscription and then are stored in {{redeliveredWaitingDispatch}}. 

{codecode:title=org.apache.activemq.broker.region.Queue:547}
List<MessageReference> unAckedMessages = sub.remove(context, this);
{code}


{code:title=org.apache.activemq.broker.region.Queue:583}
if (!qmr.isDropped()) {
                        redeliveredWaitingDispatch.addMessageLast(qmr);
                    }
{code}

The only problem is that /some/ of these messages are actually acked {{org.apache.activemq.broker.region.QueueMessageReference#isAcked}}. I have put breakpoints in this section of the code and confirmed this.

This later becomes a problem. If there are any messages that are acked that end up in {{redeliveredWaitingDispatch}}, this stops any message consumption from happening. 

If you follow {{org.apache.activemq.broker.region.Queue#doDispatch:1957}}, you can see that {{redeliveredWaitingDispatch}} /always/ takes precedence. New messages that come in don't get dispatched until {{redeliveredWaitingDispatch}} is empty. The problem is that this /never/ gets emptied.

In {{org.apache.activemq.broker.region.Queue#doActualDispatch:2002}} at line {{2028}}, we only dispatch and remove the message if the {{QueueMessageReference}} is not acknowledged.

{code}
if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
                            // Dispatch it.
                            s.add(node);
                            LOG.trace("assigned {} to consumer {}", node.getMessageId(), s.getConsumerInfo().getConsumerId());
                            iterator.remove();
                            target = s;
                            break;
                        }
{code}


I have a simple patch that will fix the issue, but I am not sure of the full implications of the patch. I am hoping one of the ActiveMQ developers could chime in here:

{code}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index c0a237f..64717f5 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -580,7 +580,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                             }
                         }
                     }
-                    if (!qmr.isDropped()) {
+                    if (!qmr.isDropped() && !qmr.isAcked()) {
                         redeliveredWaitingDispatch.addMessageLast(qmr);
                     }
                 }

{code}

To reproduce this issue, I filled the disk completely by putting messages on a queue. I then used dd to zero out any remaining space on the disk. I then consumed the messages from the queue. /Some/ messages will be consumed but at some point the consumer will fail due to the out of space errors. Its at this point there will be some acked messages put on redeliveredWaitingDispatch queue. This can be confirmed by adding a conditional breakpoint at org.apache.activemq.broker.region.Queue:584 where qmr.isAcked() == true.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)