You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Fabian Gonzalez <fa...@mulesoft.com> on 2017/07/20 16:13:31 UTC

Messages in session queue delivered before messages enqueued in consumers

Hello, all

I have opened a ticket (AMQ-6775
<https://issues.apache.org/jira/browse/AMQ-6775>) to verify an issue which
I explain below. I have added a possible fix,

https://github.com/fsgonz/activemq/commit/0411242fddb16edba38ead18fa99dd9a79a183bc

but I would like to know if I am correctly understanding the logic:

I found this situacion in
org.apache.activemq.ActiveMQSessionExecutor.iterate:
As I understand, the idea is that if there are messages queued on the
consumers they are delivered to the listeners and if that is not the case,
you should dispatch the messages queued in the session:
    public boolean iterate() {

        // Deliver any messages queued on the consumer to their listeners.
        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
            if (consumer.iterate()) {
                return true;
            }
        }

        // No messages left queued on the listeners.. so now dispatch
messages
        // queued on the session
        MessageDispatch message = messageQueue.dequeueNoWait();
        if (message == null) {
            return false;
        } else {
            dispatch(message);
            return !messageQueue.isEmpty();
        }
    }
Now the following race condition arises:
1) thread A (ActiveMQ Session Task) invokes .ActiveMQSessionExecutor
2) When this part of the code is executed:
        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
            if (consumer.iterate()) {
                return true;
            }
        }
ActiveMQMessageConsumer.iterate is invoked. There are messages unconsumed
in the consumer, but as unconsumedMessages is not started, a null is
returned as if there were no messages queued.
3) Thread A is interrupted
4) Thread B (ActiveMQConnection[xx]Scheduler) invokes
ActiveMQMessageConsumer.start, unconsumed messages are started.
5) Thread A continues to deliver the messages queued in the session (notice
that if thread B starts unconsumed messages before thread A which happens
most of times the messages in the consumer queue would have been
dispatched).
6) Thread A dispatch a message from the session when there are messages
from the consumer pending.
This race condition makes that in some cases a message which has been
rollbacked and get queued back in the consumer is redelivered after another
message in the session consumer (which was enqueued after the former
message).
Maybe the following commit in my fork would fix the issue, adding a further
verification to make sure that there are no messages queued in the
consumers before deliver the messages in the session:
https://github.com/fsgonz/activemq/commit/0411242fddb16edba38ead18fa99dd9a79a183bc

Thanks in advance for any comment,

Fabian

Re: Fwd: Messages in session queue delivered before messages enqueued in consumers

Posted by fabiangonzalez <fa...@mulesoft.com>.
I've added a test to illustrate the conditions under which the race condition
may arise, though it involves a lot of reflection use. In a production
environment, this arises once every 3-4 hours with an app that is constantly
rollbacking.

https://github.com/apache/activemq/pull/256

Could anyone verify if this is a feasible way to address the out of order
retry?





--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dev-f2368404.html

Fwd: Messages in session queue delivered before messages enqueued in consumers

Posted by Tim Bain <tb...@alumni.duke.edu>.
Dev list, could someone please look at Fabian's email from the user list,
and review his questions and his proposed patch to see if his changes
address the issue properly without introducing unwanted side effects?

Thanks,
Tim
---------- Forwarded message ----------
From: "Fabian Gonzalez" <fa...@mulesoft.com>
Date: Jul 20, 2017 10:13 AM
Subject: Messages in session queue delivered before messages enqueued in
consumers
To: <us...@activemq.apache.org>
Cc:

Hello, all
>
> I have opened a ticket (AMQ-6775
> <https://issues.apache.org/jira/browse/AMQ-6775>) to verify an issue which
> I explain below. I have added a possible fix,
>
> https://github.com/fsgonz/activemq/commit/0411242fddb16edba38ead18fa99dd
> 9a79a183bc
>
> but I would like to know if I am correctly understanding the logic:
>
> I found this situacion in
> org.apache.activemq.ActiveMQSessionExecutor.iterate:
> As I understand, the idea is that if there are messages queued on the
> consumers they are delivered to the listeners and if that is not the case,
> you should dispatch the messages queued in the session:
>     public boolean iterate() {
>
>         // Deliver any messages queued on the consumer to their listeners.
>         for (ActiveMQMessageConsumer consumer : this.session.consumers) {
>             if (consumer.iterate()) {
>                 return true;
>             }
>         }
>
>         // No messages left queued on the listeners.. so now dispatch
> messages
>         // queued on the session
>         MessageDispatch message = messageQueue.dequeueNoWait();
>         if (message == null) {
>             return false;
>         } else {
>             dispatch(message);
>             return !messageQueue.isEmpty();
>         }
>     }
> Now the following race condition arises:
> 1) thread A (ActiveMQ Session Task) invokes .ActiveMQSessionExecutor
> 2) When this part of the code is executed:
>         for (ActiveMQMessageConsumer consumer : this.session.consumers) {
>             if (consumer.iterate()) {
>                 return true;
>             }
>         }
> ActiveMQMessageConsumer.iterate is invoked. There are messages unconsumed
> in the consumer, but as unconsumedMessages is not started, a null is
> returned as if there were no messages queued.
> 3) Thread A is interrupted
> 4) Thread B (ActiveMQConnection[xx]Scheduler) invokes
> ActiveMQMessageConsumer.start, unconsumed messages are started.
> 5) Thread A continues to deliver the messages queued in the session (notice
> that if thread B starts unconsumed messages before thread A which happens
> most of times the messages in the consumer queue would have been
> dispatched).
> 6) Thread A dispatch a message from the session when there are messages
> from the consumer pending.
> This race condition makes that in some cases a message which has been
> rollbacked and get queued back in the consumer is redelivered after another
> message in the session consumer (which was enqueued after the former
> message).
> Maybe the following commit in my fork would fix the issue, adding a further
> verification to make sure that there are no messages queued in the
> consumers before deliver the messages in the session:
> https://github.com/fsgonz/activemq/commit/0411242fddb16edba38ead18fa99dd
> 9a79a183bc
>
> Thanks in advance for any comment,
>
> Fabian
>

Re: Messages in session queue delivered before messages enqueued in consumers

Posted by Fabian Gonzalez <fa...@mulesoft.com>.
Thank you very much, Tim!

On Thu, Jul 20, 2017 at 11:20 PM, Tim Bain <tb...@alumni.duke.edu> wrote:

> I forwarded this to the developer mailing list (
> http://activemq.2283324.n4.nabble.com/Fwd-Messages-in-
> session-queue-delivered-before-messages-enqueued-in-
> consumers-td4728758.html),
> so between this thread, that thread, and the JIRA entry and pull request,
> hopefully you'll get the feedback you're hoping for. But I can't say for
> sure which place you'll get responses, so watch them all.
>
> Tim
>
> On Jul 20, 2017 10:13 AM, "Fabian Gonzalez" <fa...@mulesoft.com>
> wrote:
>
> Hello, all
>
> I have opened a ticket (AMQ-6775
> <https://issues.apache.org/jira/browse/AMQ-6775>) to verify an issue which
> I explain below. I have added a possible fix,
>
> https://github.com/fsgonz/activemq/commit/0411242fddb16edba38ead18fa99dd
> 9a79a183bc
>
> but I would like to know if I am correctly understanding the logic:
>
> I found this situacion in
> org.apache.activemq.ActiveMQSessionExecutor.iterate:
> As I understand, the idea is that if there are messages queued on the
> consumers they are delivered to the listeners and if that is not the case,
> you should dispatch the messages queued in the session:
>     public boolean iterate() {
>
>         // Deliver any messages queued on the consumer to their listeners.
>         for (ActiveMQMessageConsumer consumer : this.session.consumers) {
>             if (consumer.iterate()) {
>                 return true;
>             }
>         }
>
>         // No messages left queued on the listeners.. so now dispatch
> messages
>         // queued on the session
>         MessageDispatch message = messageQueue.dequeueNoWait();
>         if (message == null) {
>             return false;
>         } else {
>             dispatch(message);
>             return !messageQueue.isEmpty();
>         }
>     }
> Now the following race condition arises:
> 1) thread A (ActiveMQ Session Task) invokes .ActiveMQSessionExecutor
> 2) When this part of the code is executed:
>         for (ActiveMQMessageConsumer consumer : this.session.consumers) {
>             if (consumer.iterate()) {
>                 return true;
>             }
>         }
> ActiveMQMessageConsumer.iterate is invoked. There are messages unconsumed
> in the consumer, but as unconsumedMessages is not started, a null is
> returned as if there were no messages queued.
> 3) Thread A is interrupted
> 4) Thread B (ActiveMQConnection[xx]Scheduler) invokes
> ActiveMQMessageConsumer.start, unconsumed messages are started.
> 5) Thread A continues to deliver the messages queued in the session (notice
> that if thread B starts unconsumed messages before thread A which happens
> most of times the messages in the consumer queue would have been
> dispatched).
> 6) Thread A dispatch a message from the session when there are messages
> from the consumer pending.
> This race condition makes that in some cases a message which has been
> rollbacked and get queued back in the consumer is redelivered after another
> message in the session consumer (which was enqueued after the former
> message).
> Maybe the following commit in my fork would fix the issue, adding a further
> verification to make sure that there are no messages queued in the
> consumers before deliver the messages in the session:
> https://github.com/fsgonz/activemq/commit/0411242fddb16edba38ead18fa99dd
> 9a79a183bc
>
> Thanks in advance for any comment,
>
> Fabian
>

Re: Messages in session queue delivered before messages enqueued in consumers

Posted by Tim Bain <tb...@alumni.duke.edu>.
I forwarded this to the developer mailing list (
http://activemq.2283324.n4.nabble.com/Fwd-Messages-in-session-queue-delivered-before-messages-enqueued-in-consumers-td4728758.html),
so between this thread, that thread, and the JIRA entry and pull request,
hopefully you'll get the feedback you're hoping for. But I can't say for
sure which place you'll get responses, so watch them all.

Tim

On Jul 20, 2017 10:13 AM, "Fabian Gonzalez" <fa...@mulesoft.com>
wrote:

Hello, all

I have opened a ticket (AMQ-6775
<https://issues.apache.org/jira/browse/AMQ-6775>) to verify an issue which
I explain below. I have added a possible fix,

https://github.com/fsgonz/activemq/commit/0411242fddb16edba38ead18fa99dd
9a79a183bc

but I would like to know if I am correctly understanding the logic:

I found this situacion in
org.apache.activemq.ActiveMQSessionExecutor.iterate:
As I understand, the idea is that if there are messages queued on the
consumers they are delivered to the listeners and if that is not the case,
you should dispatch the messages queued in the session:
    public boolean iterate() {

        // Deliver any messages queued on the consumer to their listeners.
        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
            if (consumer.iterate()) {
                return true;
            }
        }

        // No messages left queued on the listeners.. so now dispatch
messages
        // queued on the session
        MessageDispatch message = messageQueue.dequeueNoWait();
        if (message == null) {
            return false;
        } else {
            dispatch(message);
            return !messageQueue.isEmpty();
        }
    }
Now the following race condition arises:
1) thread A (ActiveMQ Session Task) invokes .ActiveMQSessionExecutor
2) When this part of the code is executed:
        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
            if (consumer.iterate()) {
                return true;
            }
        }
ActiveMQMessageConsumer.iterate is invoked. There are messages unconsumed
in the consumer, but as unconsumedMessages is not started, a null is
returned as if there were no messages queued.
3) Thread A is interrupted
4) Thread B (ActiveMQConnection[xx]Scheduler) invokes
ActiveMQMessageConsumer.start, unconsumed messages are started.
5) Thread A continues to deliver the messages queued in the session (notice
that if thread B starts unconsumed messages before thread A which happens
most of times the messages in the consumer queue would have been
dispatched).
6) Thread A dispatch a message from the session when there are messages
from the consumer pending.
This race condition makes that in some cases a message which has been
rollbacked and get queued back in the consumer is redelivered after another
message in the session consumer (which was enqueued after the former
message).
Maybe the following commit in my fork would fix the issue, adding a further
verification to make sure that there are no messages queued in the
consumers before deliver the messages in the session:
https://github.com/fsgonz/activemq/commit/0411242fddb16edba38ead18fa99dd
9a79a183bc

Thanks in advance for any comment,

Fabian