You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Christopher L. Shannon (Jira)" <ji...@apache.org> on 2020/03/09 13:37:00 UTC
[jira] [Reopened] (AMQ-7314) Redelivery counter is not getting
incremented when LastDeliveredSeqID > BrokerSequenceId
[ https://issues.apache.org/jira/browse/AMQ-7314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Christopher L. Shannon reopened AMQ-7314:
-----------------------------------------
> Redelivery counter is not getting incremented when LastDeliveredSeqID > BrokerSequenceId
> ----------------------------------------------------------------------------------------
>
> Key: AMQ-7314
> URL: https://issues.apache.org/jira/browse/AMQ-7314
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.14.4, 5.15.10
> Reporter: Dhananjay
> Assignee: Jean-Baptiste Onofré
> Priority: Major
> Fix For: 5.16.0, 5.15.12
>
> Attachments: ActiveMQ_RedeliveryCounter_Fix_Diff.htm
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Issue: Redelivery counter is not getting incremented when LastDeliveredSeqID > BrokerSequenceId
> Version tested : ActiveMQ 5.14.4 and 5.15.10 versions
> Description:
> I have created the two queues, one for sending requests and other for receiving its response.
> The steps for these queues are given below.
>
> Steps:
> 1. SenderA (Machine A) sends Msg on RequestQueue TO ReceiverB (Machine B) (Redelivery count is 0, Ack Mode:INDIVIDUAL_ACKNOWLEDGE).
> 2. SenderB sends ResponseMsg on Response queue (Machine B) TO ReceiverA (Machine A) (Async Msg Listener, Ack Mode:CLIENT_ACKNOWLEDGE)
> 3. SenderB is waiting for the consumed advisory for ResponseMsg. Request Msg is not yet acknowledged.
>
> 4. ReceiverA acknowledges the ResponseMsg. ResponseMsg consumed advisory is received by SenderB
> 5. Machine B disconnects from the embedded broker hosted on Machine A
>
> 6. On Machine B reconnection, Broker sends Msg on RequestQueue again -> ReceiverB (Issue: Redelivery count is 0 instead of 1)
> Analysis:
> 1. The Queue.java has removeSubscription() function which increments the redelivery counter by comparing lastDeliveredSequenceId and BrokerSequenceId.
> 2. The step4 incremented the LastDeliveredSeqID for message consumed advisory.
> 3. The transport connection updates the lastDeliveredSequenceId for each consumer by calling
> processRemoveConsumer(consumerId, lastDeliveredSequenceId);
> 4. The condition “ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId” in removeSubscription() doesn’t set markAsRedelivered to true.
> org.apache.activemq.broker.region.Queue removeSubscription
> FINE: queue://RequestQueue_334 remove sub: QueueSubscription: consumer=ID:5107-1569853470047-1:1:1:1, destinations=1, dispatched=2, delivered=2, pending=0, prefetch=1, prefetchExtension=2, lastDeliveredSeqId: 17, dequeues: 0, dispatched: 2, inflight: 2, groups: 0
> org.apache.activemq.broker.region.Queue removeSubscription
> FINE: LastDeliveredSeqID: 17, message reference: ID:WIN2K8R2-7986-1569853472713-4:1:1:1:1, BrokerSequenceId 7
> org.apache.activemq.broker.region.Queue removeSubscription
> FINE: LastDeliveredSeqID: 17, message reference: ID:WIN2K8R2-7986-1569853472713-4:1:1:1:2, BrokerSequenceId 8
> Queue.java , removeSubscription()
> Existing Code in 5.14.4:
> // locate last redelivered in unconsumed list (list in delivery rather than seq order)
> if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
> for (MessageReference ref : unAckedMessages) {
> if (ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId) {
> lastDeliveredRef = ref;
> markAsRedelivered = true;
> LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeliveredSequenceId, ref.getMessageId());
> break;
> }
> }
> }
> Suggested fix:
> // locate last redelivered in unconsumed list (list in delivery rather than seq order)
> if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
> for (MessageReference ref : unAckedMessages) {
> LOG.debug("UnAcked message reference: {}, BrokerSequenceId {}", ref.getMessageId(), ref.getMessageId().getBrokerSequenceId());
> if (lastDeliveredSequenceId == 0 || (lastDeliveredSequenceId > 0 && ref.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId))
> {
> lastDeliveredRef = ref;
> markAsRedelivered = true;
> LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeliveredSequenceId, ref.getMessageId());
> }
> }
> }
> Could you please review the analysis and fix?
> Thanks,
> Dhananjay
--
This message was sent by Atlassian Jira
(v8.3.4#803005)