You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Torsten Mielke (JIRA)" <ji...@apache.org> on 2012/08/13 08:55:38 UTC

[jira] [Assigned] (AMQ-3965) Expired msgs not getting acked to broker causing consumer to fill up its prefetch and not getting more msgs.

     [ https://issues.apache.org/jira/browse/AMQ-3965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Torsten Mielke reassigned AMQ-3965:
-----------------------------------

    Assignee: Torsten Mielke
    
> Expired msgs not getting acked to broker causing consumer to fill up its prefetch and not getting more msgs.
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3965
>                 URL: https://issues.apache.org/jira/browse/AMQ-3965
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: JMS client
>    Affects Versions: 5.6.0
>            Reporter: Torsten Mielke
>            Assignee: Torsten Mielke
>              Labels: optimizeDispatch
>         Attachments: AMQ-3956.patch, OptimizeAcknowledgeWithExpiredMsgsTest.java, testcase.tgz
>
>
> It is possible to get a consumer stalled and not receiving any more messages when using optimizeAcknowledge.
> Let me illustrate in an example (JUnit test attached).
> Suppose a consumer with optimizeAcknowledge and a prefetch of 100 msgs.
> The broker's queue contains 105 msg. The first 45 msgs have a very low expiry time, the remaining don't expiry. 
> So the first 100 msgs get dispatched to the consumer (due to prefetch=100). Out of these the first 45 msgs do not get dispatched to consumer code because their expiry has elapsed by the time that are handled in the client. 
> {code:title=ActiveMQMessageConsumer.java}
> public void dispatch(MessageDispatch md) {
>         MessageListener listener = this.messageListener.get();
>         try {
>             [...]
>             synchronized (unconsumedMessages.getMutex()) {
>                 if (!unconsumedMessages.isClosed()) {
>                     if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
>                         if (listener != null && unconsumedMessages.isRunning()) {
>                             ActiveMQMessage message = createActiveMQMessage(md);
>                             beforeMessageIsConsumed(md);
>                             try {
>                                 boolean expired = message.isExpired();
>                                 if (!expired) {
>                                     listener.onMessage(message);
>                                 }
>                                 afterMessageIsConsumed(md, expired);
> {code}
> listener.onMessage() above is not called as the msg has expired. 
> However it will calls into afterMessagesIsConsumed()
> {code:title=ActiveMQMessageConsumer.java}
>     private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
>       [...]  
>       if (messageExpired) {
>             synchronized (deliveredMessages) {
>                 deliveredMessages.remove(md);
>             }
>             stats.getExpiredMessageCount().increment();
>             ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
> {code}
> and will remove the expired msg from the deliveredMessages list. It then calls into ackLater(). 
> However ackLater() only fires an ack back to the broker when the number of unsent acks has reached 50% of the prefetch value.
> {code:title=ActiveMQMessageConsumer.java}
>  private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
>     [...]
>     if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
>             session.sendAck(pendingAck);
> {code}        
> In our example it has not reached that mark (only 45 expired msgs, i.e. 45%). 
> So the first 45 msgs, which expired before being dispatched, did not cause an ack being sent to the broker.
> Now the next 55 messages get processed. These don't have an expiry so they get dispatched to consumer code. 
> After dispatching each msg to the registered application code, we call into afterMessageIsConsumed() but this time executing a different branch as the msgs are not expired
> {code:title=ActiveMQMessageConsumer.java}
> private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
>     [...]
>     else if (isAutoAcknowledgeEach()) {
>                 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
>                     synchronized (deliveredMessages) {
>                         if (!deliveredMessages.isEmpty()) {
>                             if (optimizeAcknowledge) {
>                                 ackCounter++;
>                                 if (ackCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
>                                     MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                     if (ack != null) {
>                                         deliveredMessages.clear();
>                                         ackCounter = 0;
>                                         session.sendAck(ack);
>                                         optimizeAckTimestamp = System.currentTimeMillis();
>                                     }
>                                 }
> {code}
> with optimizeAcknowledge=true we only send an ack back to the broker if either optimizeAcknowledgeTimeOut has elapsed or the ackCounter has reached 65% of the prefetch (100). 
> The timeout will not have kicked in. The ackCounter will be at 55 after processing the last of 100 prefetched messages which is less than 65% of 100. So with the last prefetched msg being processed, it will not generate an ack back to the broker. 
> As a result, the client has processed all prefetched message and will not get any new messages dispatched from the broker. The broker has another 5 msgs on the queue but since it never received an ack from the client, it won't dispatch any further messages. 
> The client is stalled. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira