You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Robbie Gemmell (Assigned) (JIRA)" <ji...@apache.org> on 2012/04/03 19:08:25 UTC

[jira] [Assigned] (QPID-3927) Java Broker Priority Queue dispatch not working correctly

     [ https://issues.apache.org/jira/browse/QPID-3927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Robbie Gemmell reassigned QPID-3927:
------------------------------------

    Assignee: Robbie Gemmell
    
> Java Broker Priority Queue dispatch not working correctly
> ---------------------------------------------------------
>
>                 Key: QPID-3927
>                 URL: https://issues.apache.org/jira/browse/QPID-3927
>             Project: Qpid
>          Issue Type: Bug
>          Components: Java Broker
>    Affects Versions: 0.16
>         Environment: Ubuntu 10.04 LTS, 
> Java Broker (0.16), Java Client (0.16), BDBMessageStore backend.
>            Reporter: Praveen Murugesan
>            Assignee: Robbie Gemmell
>            Priority: Critical
>              Labels: broker, java, priority
>         Attachments: QpidPriorityQueueRaceIssue.java, QpidPriorityQueueRaceIssue2.java
>
>
> The following steps reproduce the issue.
> 1) I create a priority Queue (with 10 levels) - Q1
> 2) I create a listener for Q1 and attach it to a consumer.
> 3) I enqueue a message to the priority Queue with priority 0.
> 4) The message gets dequeued by the subscribed consumer, and in the handler callback, I re-enqueue the message that I dequeued with default priority(4) using another session & commit to the same queue Q1.
> 5) Commit the consumer session.
> 6) Ideally the message I re-enqueued should be delivered the consumer when the handler returns. The message delivery happens sometimes/sometimes not (flaps).
> I decided to take a swing at this and get to the bottom of the problem and looked into the Java Broker code, and this is what I see is happening:
> I realized that the problem was in the logic of the method
> public SimpleQueueEntryImpl next(SimpleQueueEntryImpl node); in priorityQueueList.java
> This method walks through the priority list using the node passed in as the base node, and walks to list to the end. But this causes a problem, when encountering race conditions where
> new messages arrive at a higher priority when a message of lower priority is still worked upon.
> Case 1)
> 1) I create a priority Queue (with 10 levels) - Q1
> 2) I create a listener for Q1 and attach it to a consumer.
> 3) I enqueue a message to the priority Queue with priority 0.
> 4) At this point, since the subscriber is free/available, the enqueued message is delivered to the subscriber synchronously. QueueContext (Last Seen -> msg1 in P0(acquired state), released entry -> null)
> 5) Enqueue a message to P4 ~ delivery async as the subscriber is busy. QueueContext(last seen -> msg1 in p0(acquired state), released entry -> msg1 in p4(availabe state)).
> 6) commit the consumer session and return the handler. 
> 7) msg in p4 dispatched, QueueContext(last seen -> msg1 in p4(acquired state))
> 8) QueueRunner runs again, and updates the QueueContext(last seen -> msg1 in p0) ~ as msg 1 in P0 is still in acquired state.
> 9) msg 2 now enqueued to p4Q. QueueContext(last seen -> msg1 in p0, msg2 in p4)
> 10) now the subscriber cannot make progress, as the comparison for the next message [QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);] always goes to _entries.next(lastSeen), which is at the last priority and thinks it doesn't have anything to process.
> Example Sequence for problem described above.
> P4 Q -> msg1 -> msg2(arrives after msg1 of p4 is dispatched, but before msg1 of p0 status changes to deleted).
> p0 Q -> msg1 
> In the above scenario, I'm still quite confused on the race in state change between acquired -> deleted. 
> Case 2)
> 1) I create a priority Queue (with 10 levels) - Q1
> 2) I enqueue a message with priority P4.
> 3) I consume the message synchronously (consumer.receive)., so that the message is now marked deleted. 
> 4) I commit/close the consumer
> 5) I create a new listener for Q1 and attach it to a consumer.
> 6) I enqueue a message to the priority Queue with priority 0.
> 7) At this point, since the subscriber is free/available, the enqueued message is delivered to the subscriber synchronously. QueueContext (Last Seen -> msg1 in P0(acquired state), released entry -> null)
> 8) Enqueue a message to P4 ~ delivery async as the subscriber is busy. QueueContext(last seen -> msg1 in p0(acquired state), released entry -> msg2 in p4(availabe state)).
> 9) commit the consumer session and return the handler. 
> 10) now queueRunner, tries to make progress, but the subscriber cannot make progress, as the comparison for the next message [QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);] always goes to _entries.next(lastSeen), which is at the last priority and thinks it doesn't have anything to process.
> Example Sequence for problem described above. Here msg1 is consumed synchronously before starting the asynchronous consumer.
> P4 Q -> msg1(deleted) -> msg2
> p0 Q -> msg1 
> I hope i'm quite clear in the above notes, either way I've attached a test which totally reproduces the issue that I mention here! :)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org