You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by djdkedev <dj...@gmail.com> on 2019/10/03 00:26:16 UTC

Redelivery counter is not getting incremented when LastDeliveredSeqID > BrokerSequenceId

ssue: Redelivery counter is not getting incremented when LastDeliveredSeqID >
BrokerSequenceId

Version tested : ActiveMQ 5.14.4 and 5.15.10 versions

Description:
I have created the two queues, one for sending requests and other for
receiving its response.
The steps for these queues are given below.

Steps:

1. SenderA (Machine A) sends Msg on RequestQueue TO ReceiverB (Machine B)
(Redelivery count is 0, Ack Mode:INDIVIDUAL_ACKNOWLEDGE).

2. SenderB sends ResponseMsg on Response queue (Machine B) TO ReceiverA
(Machine A) (Async Msg Listener, Ack Mode:CLIENT_ACKNOWLEDGE)

3. SenderB is waiting for the consumed advisory for ResponseMsg. Request Msg
is not yet acknowledged.

4. ReceiverA acknowledges the ResponseMsg. ResponseMsg consumed advisory is
received by SenderB

5. Machine B disconnects from the embedded broker hosted on Machine A

6. On Machine B reconnection, Broker sends Msg on RequestQueue again ->
ReceiverB (Issue: Redelivery count is 0 instead of 1)

Analysis:
1. The Queue.java has removeSubscription() function which increments the
redelivery counter by comparing lastDeliveredSequenceId and
BrokerSequenceId.
2. The step4 incremented the LastDeliveredSeqID for message consumed
advisory.
3. The transport connection updates the lastDeliveredSequenceId for each
consumer by calling
processRemoveConsumer(consumerId, lastDeliveredSequenceId);
4. The condition “ref.getMessageId().getBrokerSequenceId() ==
lastDeliveredSequenceId” in removeSubscription() doesn’t set
markAsRedelivered to true.

org.apache.activemq.broker.region.Queue removeSubscription
FINE: queue://RequestQueue_334 remove sub: QueueSubscription:
consumer=ID:5107-1569853470047-1:1:1:1, destinations=1, dispatched=2,
delivered=2, pending=0, prefetch=1, prefetchExtension=2, lastDeliveredSeqId:
17, dequeues: 0, dispatched: 2, inflight: 2, groups: 0
org.apache.activemq.broker.region.Queue removeSubscription
FINE: LastDeliveredSeqID: 17, message reference:
ID:WIN2K8R2-7986-1569853472713-4:1:1:1:1, BrokerSequenceId 7
org.apache.activemq.broker.region.Queue removeSubscription
FINE: LastDeliveredSeqID: 17, message reference:
ID:WIN2K8R2-7986-1569853472713-4:1:1:1:2, BrokerSequenceId 8

Queue.java , removeSubscription()

Existing Code in 5.14.4:

// locate last redelivered in unconsumed list (list in delivery rather than
seq order)
if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
for (MessageReference ref : unAckedMessages) {
if (ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId) {
lastDeliveredRef = ref;
markAsRedelivered = true;
LOG.debug("found lastDeliveredSeqID: {}, message reference: {}",
lastDeliveredSequenceId, ref.getMessageId());
break;
}
}
}

Suggested fix:

// locate last redelivered in unconsumed list (list in delivery rather than
seq order)
if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
for (MessageReference ref : unAckedMessages) {
LOG.debug("UnAcked message reference: {}, BrokerSequenceId {}",
ref.getMessageId(), ref.getMessageId().getBrokerSequenceId());
if (lastDeliveredSequenceId == 0 || (lastDeliveredSequenceId > 0 &&
ref.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId))
{
lastDeliveredRef = ref;
markAsRedelivered = true;
LOG.debug("found lastDeliveredSeqID: {}, message reference: {}",
lastDeliveredSequenceId, ref.getMessageId());
}
}
}

Could you please review the analysis and fix?

Thanks,
Dhananjay



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Re: Redelivery counter is not getting incremented when LastDeliveredSeqID > BrokerSequenceId

Posted by djdkedev <dj...@gmail.com>.
Your understanding is correct. Thank you for your review.



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Re: Redelivery counter is not getting incremented when LastDeliveredSeqID > BrokerSequenceId

Posted by Tim Bain <tb...@alumni.duke.edu>.
I see you submitted https://issues.apache.org/jira/browse/AMQ-7314 for
this; thank you for that.

I understand your fix to be a switch from "is the message I'm sending the
same as the most recent one I've already sent" to "is the message I'm
sending the same as or earlier than the most recent one I've already sent."
If I'm understanding that correctly, then I agree with your analysis and
your fix.

Tim

On Wed, Oct 2, 2019 at 6:44 PM djdkedev <dj...@gmail.com> wrote:

> ssue: Redelivery counter is not getting incremented when
> LastDeliveredSeqID >
> BrokerSequenceId
>
> Version tested : ActiveMQ 5.14.4 and 5.15.10 versions
>
> Description:
> I have created the two queues, one for sending requests and other for
> receiving its response.
> The steps for these queues are given below.
>
> Steps:
>
> 1. SenderA (Machine A) sends Msg on RequestQueue TO ReceiverB (Machine B)
> (Redelivery count is 0, Ack Mode:INDIVIDUAL_ACKNOWLEDGE).
>
> 2. SenderB sends ResponseMsg on Response queue (Machine B) TO ReceiverA
> (Machine A) (Async Msg Listener, Ack Mode:CLIENT_ACKNOWLEDGE)
>
> 3. SenderB is waiting for the consumed advisory for ResponseMsg. Request
> Msg
> is not yet acknowledged.
>
> 4. ReceiverA acknowledges the ResponseMsg. ResponseMsg consumed advisory is
> received by SenderB
>
> 5. Machine B disconnects from the embedded broker hosted on Machine A
>
> 6. On Machine B reconnection, Broker sends Msg on RequestQueue again ->
> ReceiverB (Issue: Redelivery count is 0 instead of 1)
>
> Analysis:
> 1. The Queue.java has removeSubscription() function which increments the
> redelivery counter by comparing lastDeliveredSequenceId and
> BrokerSequenceId.
> 2. The step4 incremented the LastDeliveredSeqID for message consumed
> advisory.
> 3. The transport connection updates the lastDeliveredSequenceId for each
> consumer by calling
> processRemoveConsumer(consumerId, lastDeliveredSequenceId);
> 4. The condition “ref.getMessageId().getBrokerSequenceId() ==
> lastDeliveredSequenceId” in removeSubscription() doesn’t set
> markAsRedelivered to true.
>
> org.apache.activemq.broker.region.Queue removeSubscription
> FINE: queue://RequestQueue_334 remove sub: QueueSubscription:
> consumer=ID:5107-1569853470047-1:1:1:1, destinations=1, dispatched=2,
> delivered=2, pending=0, prefetch=1, prefetchExtension=2,
> lastDeliveredSeqId:
> 17, dequeues: 0, dispatched: 2, inflight: 2, groups: 0
> org.apache.activemq.broker.region.Queue removeSubscription
> FINE: LastDeliveredSeqID: 17, message reference:
> ID:WIN2K8R2-7986-1569853472713-4:1:1:1:1, BrokerSequenceId 7
> org.apache.activemq.broker.region.Queue removeSubscription
> FINE: LastDeliveredSeqID: 17, message reference:
> ID:WIN2K8R2-7986-1569853472713-4:1:1:1:2, BrokerSequenceId 8
>
> Queue.java , removeSubscription()
>
> Existing Code in 5.14.4:
>
> // locate last redelivered in unconsumed list (list in delivery rather than
> seq order)
> if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
> for (MessageReference ref : unAckedMessages) {
> if (ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId) {
> lastDeliveredRef = ref;
> markAsRedelivered = true;
> LOG.debug("found lastDeliveredSeqID: {}, message reference: {}",
> lastDeliveredSequenceId, ref.getMessageId());
> break;
> }
> }
> }
>
> Suggested fix:
>
> // locate last redelivered in unconsumed list (list in delivery rather than
> seq order)
> if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
> for (MessageReference ref : unAckedMessages) {
> LOG.debug("UnAcked message reference: {}, BrokerSequenceId {}",
> ref.getMessageId(), ref.getMessageId().getBrokerSequenceId());
> if (lastDeliveredSequenceId == 0 || (lastDeliveredSequenceId > 0 &&
> ref.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId))
> {
> lastDeliveredRef = ref;
> markAsRedelivered = true;
> LOG.debug("found lastDeliveredSeqID: {}, message reference: {}",
> lastDeliveredSequenceId, ref.getMessageId());
> }
> }
> }
>
> Could you please review the analysis and fix?
>
> Thanks,
> Dhananjay
>
>
>
> --
> Sent from:
> http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html
>