You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Fabian González (JIRA)" <ji...@apache.org> on 2017/05/26 19:21:04 UTC

[jira] [Commented] (AMQ-6658) Messenger does not respect order after redelivery

    [ https://issues.apache.org/jira/browse/AMQ-6658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16026716#comment-16026716 ] 

Fabian González commented on AMQ-6658:
--------------------------------------

I think I found what causes the issue concerning the wrong order after redelivery.

There are two active threads which are involved in an apparent race condition:

ActiveMQConnection[xx]Scheduler
ActiveMQ Session Task

1) A Session Task thread verifies in ActiveMQSessionExecutor.iterate that there are messages queued on the session (as there are 60 messages prefetched).
2) It dispatchs the threads through ActiveMQMessageConsumer
3) if unconsumedMessages is not running in ActiveMQMessageConsumer.dispatch, it enqueues the message in unconsumedMessages and it is not redelivered. 

So if ActiveMQ Session Task polls messages several times before ActiveMQConnection[xx]Scheduler starts unconsumedMessages in ActiveMQMessageConsumer:1889 (unconsumedMessages.start();), there would be several prefetched unconsumed messages.

The problem arises if ActiveMQ Session Task polls messages several times, it polls a new message, and then  ActiveMQConnection[xx]Scheduler. In that case unconsumedMessages is running, so the message is delivered (but the first message which was enqueued in unconsumedMessage was the one which should have been delivered first).


I think the dispatch message has to deliver always the first mesage oldest message which was enqueued in unconsumedMessages.

For example in the dispatch method in ActiveMQMessageConsumer, something like this should be done:

    @Override
    public void dispatch(MessageDispatch md)
    {
        MessageListener listener = this.messageListener.get();
        try
        {
            clearMessagesInProgress();
            clearDeliveredList();
            synchronized (unconsumedMessages.getMutex())
            {
                if (!unconsumedMessages.isClosed())
                {
                    if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage()))
                    {
                        if (listener != null && unconsumedMessages.isRunning())
                        {
                            // otherwise I do not preserve the order for
                            // redelivery
                            unconsumedMessages.enqueue(md);
                            md = unconsumedMessages.dequeueNoWait();

 

> Messenger does not respect order after redelivery
> -------------------------------------------------
>
>                 Key: AMQ-6658
>                 URL: https://issues.apache.org/jira/browse/AMQ-6658
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.14.3
>            Reporter: Fabian González
>
> I am facing a situation  where activemq seems not to respect the order for dispatched messages when a redilevery is needed using activemq-client 5.14.3.
> I am sending 60 messages to a queue with a single consumer, and I've noticed that sometimes when a relivery of the message is needed, as a consequence of rollback, another message from those 60 message is served before the redelivered message. There is no maxRedelivery set.
> What I notice debugging ActiveMQMessageConsumer is that the following behaviour may occur:
> - The 60 messages are dispatched in order in:
> ActiveMQMessageConsumer:1376:
>     @Override
>     public void dispatch(MessageDispatch md) {
>         MessageListener listener = this.messageListener.get();
>         try {
>             clearMessagesInProgress();
>             ...
> unconsumedMessage is running so the message is sent to the listener.
> - a rollback is performed and the message is redelivered (with a default delay):
> ActiveMQMessageConsumer:1305:
>                         if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
>                             // Start up the delivery again a little later.
>                             session.getScheduler().executeAfterDelay(new Runnable() {
>                                 @Override
>                                 public void run() {
>                                     try {
>                                         if (started.get()) {
>                                             start();
>                                         }
>                                     } catch (JMSException e) {
>                                         session.connection.onAsyncException(e);
>                                     }
>                                 }
>                             }, redeliveryDelay);
>                         } else {
>                             start();
>                         }
> Periodically, the messages enqueued in the session are attempted to be consumed (as the unconsumedMessages from the consumer is not running they are not sent to the listener to be consumed and they are enqueued as unconsumedMessages).
> But if the thread scheduled from redelivery is started when the iteration from the unconsumed messages is being performed, the unconsumedMessages is started in:
>     public void start() throws JMSException {
>         if (unconsumedMessages.isClosed()) {
>             return;
>         }
>         started.set(true);
>         unconsumedMessages.start();
>         session.executor.wakeup();
>     }
> and the message that is being considered from session (in the other thread) is sent to the listener before the redelivered message, which may be an error in order.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)