You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Jean-Baptiste Onofré (Jira)" <ji...@apache.org> on 2021/08/02 13:00:00 UTC

[jira] [Updated] (AMQ-7314) Redelivery counter is not getting incremented when LastDeliveredSeqID > BrokerSequenceId

     [ https://issues.apache.org/jira/browse/AMQ-7314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jean-Baptiste Onofré updated AMQ-7314:
--------------------------------------
    Fix Version/s:     (was: 5.16.3)
                   5.16.4

> Redelivery counter is not getting incremented when LastDeliveredSeqID > BrokerSequenceId
> ----------------------------------------------------------------------------------------
>
>                 Key: AMQ-7314
>                 URL: https://issues.apache.org/jira/browse/AMQ-7314
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.14.4, 5.15.10
>            Reporter: Dhananjay
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
>             Fix For: 5.16.4
>
>         Attachments: ActiveMQ_RedeliveryCounter_Fix_Diff.htm
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Issue: Redelivery counter is not getting incremented when LastDeliveredSeqID > BrokerSequenceId
> Version tested : ActiveMQ 5.14.4 and 5.15.10 versions
> Description:
> I have created the two queues, one for sending requests and other for receiving its response.
> The steps for these queues are given below.
>  
> Steps:
> 1. SenderA (Machine A) sends Msg on RequestQueue TO ReceiverB (Machine B) (Redelivery count is 0, Ack Mode:INDIVIDUAL_ACKNOWLEDGE).
> 2. SenderB sends ResponseMsg on Response queue (Machine B) TO ReceiverA (Machine A) (Async Msg Listener, Ack Mode:CLIENT_ACKNOWLEDGE)
> 3. SenderB is waiting for the consumed advisory for ResponseMsg. Request Msg is not yet acknowledged.
>  
>  4. ReceiverA acknowledges the ResponseMsg. ResponseMsg consumed advisory is received by SenderB
> 5. Machine B disconnects from the embedded broker hosted on Machine A
>  
>  6. On Machine B reconnection, Broker sends Msg on RequestQueue again -> ReceiverB (Issue: Redelivery count is 0 instead of 1)
> Analysis:
> 1. The Queue.java has removeSubscription() function which increments the redelivery counter by comparing lastDeliveredSequenceId and BrokerSequenceId.
> 2. The step4 incremented the LastDeliveredSeqID for message consumed advisory. 
> 3. The transport connection updates the lastDeliveredSequenceId for each consumer by calling
>  processRemoveConsumer(consumerId, lastDeliveredSequenceId);
>  4. The condition “ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId” in removeSubscription() doesn’t set markAsRedelivered to true.
> org.apache.activemq.broker.region.Queue removeSubscription
> FINE: queue://RequestQueue_334 remove sub: QueueSubscription: consumer=ID:5107-1569853470047-1:1:1:1, destinations=1, dispatched=2, delivered=2, pending=0, prefetch=1, prefetchExtension=2, lastDeliveredSeqId: 17, dequeues: 0, dispatched: 2, inflight: 2, groups: 0
> org.apache.activemq.broker.region.Queue removeSubscription
> FINE: LastDeliveredSeqID: 17, message reference: ID:WIN2K8R2-7986-1569853472713-4:1:1:1:1, BrokerSequenceId 7
> org.apache.activemq.broker.region.Queue removeSubscription
> FINE: LastDeliveredSeqID: 17, message reference: ID:WIN2K8R2-7986-1569853472713-4:1:1:1:2, BrokerSequenceId 8
> Queue.java , removeSubscription()
> Existing Code in 5.14.4:
> // locate last redelivered in unconsumed list (list in delivery rather than seq order)
>  if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
>  for (MessageReference ref : unAckedMessages) {
>  if (ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId) {
>  lastDeliveredRef = ref;
>  markAsRedelivered = true;
>  LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeliveredSequenceId, ref.getMessageId());
>  break;
>  }
>  }
>  }
> Suggested fix:
> // locate last redelivered in unconsumed list (list in delivery rather than seq order)
>  if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
>  for (MessageReference ref : unAckedMessages) {
>  LOG.debug("UnAcked message reference: {}, BrokerSequenceId {}", ref.getMessageId(), ref.getMessageId().getBrokerSequenceId());
>  if (lastDeliveredSequenceId == 0 || (lastDeliveredSequenceId > 0 && ref.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId)) 
>  {
>  lastDeliveredRef = ref;
>  markAsRedelivered = true;
>  LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeliveredSequenceId, ref.getMessageId());
>  }
>  }
>  }
> Could you please review the analysis and fix?
> Thanks, 
> Dhananjay



--
This message was sent by Atlassian Jira
(v8.3.4#803005)