You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by frankie_hr <fr...@2e-systems.com> on 2015/12/09 13:55:03 UTC

Selective consuming of priority messages with message groups

Hi All,

I'm having issues with a little piece of the code which is supposed to
selectively consume the messages from a queue, using the message priority as
the selector value. In addition to that, the grouping of messages is also
being used.

The prioritization policy is defined for the broker:

        PolicyEntry policy = new PolicyEntry();
        policy.setQueue(">");
        policy.setPrioritizedMessages(true);

The route is as follows:

        from("jms:queue:test?concurrentConsumers=1&selector=JMSPriority>=4")
            .bean(Processor.class, "process")
            .to("mock:result");

        from("jms:queue:test?concurrentConsumers=1&selector=JMSPriority<4")
            .bean(Processor.class, "process")
            .to("mock:result");

The test messages are pushed to the queue as follows:

        List<Message> messagesToSend = new ArrayList<>();
        Message message;

        message = new Message(1);
        message.setGroup("1");
        message.setPriority(0);
        messagesToSend.add(message);

        message = new Message(2);
        message.setGroup("1");
        message.setPriority(9);
        messagesToSend.add(message);

        message = new Message(3);
        message.setGroup("1");
        message.setPriority(0);
        messagesToSend.add(message);

        for (Message messageToSend : messagesToSend) {

            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put(JMS_GROUP_ID_HEADER, messageToSend.getGroup());


            headers.put(JMS_PRIORITY_HEADER, messageToSend.getPriority());
            template.sendBodyAndHeaders("jms:queue:test3", messageToSend,
headers);
        }

The behavior I expect is the following:

- Message 1 is pushed to the queue.
- Message 1 is consumed by the thread #1.
- Message 2 is pushed to the queue.
- Message 2 is consumed by the thread #2 (because of priority based
selector).
- Message 3 is pushed to the queue.
- Message 3 is consumed by the thread #1.

Instead, the behavior I get is the following:

- Message 1 is pushed to the queue.
- Message 1 is consumed by the thread #1.
- Message 2 is pushed to the queue.
- Message 2 sits on the queue, not being consumed by any thread.
- Message 3 is pushed to the queue.
- Message 3 is consumed by the thread #1.

The general overview of the situation is that only the messages matching the
priority of the first message pushed to the queue will be processed,
regardless of the order they're pushed in, while the messages with priority
falling into the other side of the selector will not be consumed at all and
will stay sitting on the queue.

The version of AMQ broker I'm using is 5.10.

So, does this seems to you like a bug or a feature I'm not able to
understand? All input on the topic is highly welcome and appreciated!

Regards,
Frankie



--
View this message in context: http://activemq.2283324.n4.nabble.com/Selective-consuming-of-priority-messages-with-message-groups-tp4704813.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Selective consuming of priority messages with message groups

Posted by Tim Bain <tb...@alumni.duke.edu>.
I think that's exactly what's going on.  I didn't notice that line in your
code when you first posted it; sorry!
On Dec 10, 2015 7:28 AM, "frankie_hr" <fr...@2e-systems.com> wrote:

> Hi All,
>
> Here's an update to the point Three:
>
> > Third, use a JMX viewer such as JConsole to look at the subscriptions on
> > that queue.  Find the subscription that isn't working (#1) and see
> whether
> > it's present and whether its selector looks correct.  See whether any
> > messages have been dispatched to it.
>
> I did try connecting with the jConsole to the local AMQ broker and indeed
> there are two Consumers for the queue: one with the selector
> "JMSPriority>=4" and one with "JMSPriority<4". After the messages were
> pushed to the queue, the second Consumer shows dispatch, as well as dequeue
> count of 2, while the first one shows a 0 for both of the values.
>
> My suspicion is that message groups have something to do with it. It seems
> as if a Thread #n was dedicated to process the message 1, and then (because
> of the same message group) it wanted to process the message 2 as well, but
> it couldn't because the message 2 was supposed to be processed by a
> different consumer because of the selector. Once the message 3 arrives
> (also
> having the same group), it was processed by Thread #n, since the consumer
> was the same as for message 1.
>
> Does this make any sense? Any comments?
>
> Regards,
> Frankie
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Selective-consuming-of-priority-messages-with-message-groups-tp4704813p4704850.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>

Re: Selective consuming of priority messages with message groups

Posted by artnaseef <ar...@artnaseef.com>.
There are two things that happen with message groups that makes them a poor
match for use with selectors.

First, the default implementation of message groups uses "buckets" of
assignments for groups-to-consumers.  This means that multiple groups
actually get assigned to the same consumer because the group name shares the
same hash.  SO, it is entirely possible to have a message group assigned to
a consumer whose selector does not match.

Second, even if that were not the case, all messages within a single group
are dedicated to a single consumer, based on the group name.  Any messages
that do not match that consumers selector but are in the same group will sit
unconsumed, filling up the max-page-size limit (defaults to 200).



--
View this message in context: http://activemq.2283324.n4.nabble.com/Selective-consuming-of-priority-messages-with-message-groups-tp4704813p4704937.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Selective consuming of priority messages with message groups

Posted by frankie_hr <fr...@2e-systems.com>.
Hi All,

Here's an update to the point Three:

> Third, use a JMX viewer such as JConsole to look at the subscriptions on 
> that queue.  Find the subscription that isn't working (#1) and see whether 
> it's present and whether its selector looks correct.  See whether any 
> messages have been dispatched to it. 

I did try connecting with the jConsole to the local AMQ broker and indeed
there are two Consumers for the queue: one with the selector
"JMSPriority>=4" and one with "JMSPriority<4". After the messages were
pushed to the queue, the second Consumer shows dispatch, as well as dequeue
count of 2, while the first one shows a 0 for both of the values.

My suspicion is that message groups have something to do with it. It seems
as if a Thread #n was dedicated to process the message 1, and then (because
of the same message group) it wanted to process the message 2 as well, but
it couldn't because the message 2 was supposed to be processed by a
different consumer because of the selector. Once the message 3 arrives (also
having the same group), it was processed by Thread #n, since the consumer
was the same as for message 1.

Does this make any sense? Any comments?

Regards,
Frankie



--
View this message in context: http://activemq.2283324.n4.nabble.com/Selective-consuming-of-priority-messages-with-message-groups-tp4704813p4704850.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Selective consuming of priority messages with message groups

Posted by frankie_hr <fr...@2e-systems.com>.
Hi Tim,

Thanks a lot for the reply and the suggestions. Please find some of my
comments inline.

Regards,
Frankie


Tim Bain wrote
> First, shouldn't you be expecting thread 2 to consume message 1?
> 
> [Frankie] The thread numbers were symbolic only. In real life the numbers
> would be something like Thread #11 and Thread #19. What I meant was that
> whichever thread is consuming from the second route will consume the
> messages 1 and 3.
> 
> Second, I'm guessing you're defining the routes in an XML file, in which
> case don't you need to encode "<" as "&lt;" and ">" as "&gt;"?
> 
> [Frankie] The routes are defined through the Java code, by using "<" and
> ">" instead of "&lt;" and "&gt;"
> 
> Third, use a JMX viewer such as JConsole to look at the subscriptions on
> that queue.  Find the subscription that isn't working (#1) and see whether
> it's present and whether its selector looks correct.  See whether any
> messages have been dispatched to it.
> 
> [Frankie] Will try that.
> 
> Fourth, use the web console to browse the queue.  Look at the message that
> isn't getting consumed and see if the headers look correct.
> 
> [Frankie] Checked the web console for the local AMQ broker and the message
> remained sitting on the queue. Will re-check how the headers are looking,
> even though not sure why it wouldn't work because it's explicitly set in a
> same way as it is for low priority messages.
> 
> Tim





--
View this message in context: http://activemq.2283324.n4.nabble.com/Selective-consuming-of-priority-messages-with-message-groups-tp4704813p4704818.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Selective consuming of priority messages with message groups

Posted by Tim Bain <tb...@alumni.duke.edu>.
First, shouldn't you be expecting thread 2 to consume message 1?

Second, I'm guessing you're defining the routes in an XML file, in which
case don't you need to encode "<" as "&lt;" and ">" as "&gt;"?

Third, use a JMX viewer such as JConsole to look at the subscriptions on
that queue.  Find the subscription that isn't working (#1) and see whether
it's present and whether its selector looks correct.  See whether any
messages have been dispatched to it.

Fourth, use the web console to browse the queue.  Look at the message that
isn't getting consumed and see if the headers look correct.

Tim
On Dec 9, 2015 6:12 AM, "frankie_hr" <fr...@2e-systems.com> wrote:

> Hi All,
>
> I'm having issues with a little piece of the code which is supposed to
> selectively consume the messages from a queue, using the message priority
> as
> the selector value. In addition to that, the grouping of messages is also
> being used.
>
> The prioritization policy is defined for the broker:
>
>         PolicyEntry policy = new PolicyEntry();
>         policy.setQueue(">");
>         policy.setPrioritizedMessages(true);
>
> The route is as follows:
>
>
> from("jms:queue:test?concurrentConsumers=1&selector=JMSPriority>=4")
>             .bean(Processor.class, "process")
>             .to("mock:result");
>
>         from("jms:queue:test?concurrentConsumers=1&selector=JMSPriority<4")
>             .bean(Processor.class, "process")
>             .to("mock:result");
>
> The test messages are pushed to the queue as follows:
>
>         List<Message> messagesToSend = new ArrayList<>();
>         Message message;
>
>         message = new Message(1);
>         message.setGroup("1");
>         message.setPriority(0);
>         messagesToSend.add(message);
>
>         message = new Message(2);
>         message.setGroup("1");
>         message.setPriority(9);
>         messagesToSend.add(message);
>
>         message = new Message(3);
>         message.setGroup("1");
>         message.setPriority(0);
>         messagesToSend.add(message);
>
>         for (Message messageToSend : messagesToSend) {
>
>             Map<String, Object> headers = new HashMap<String, Object>();
>             headers.put(JMS_GROUP_ID_HEADER, messageToSend.getGroup());
>
>
>             headers.put(JMS_PRIORITY_HEADER, messageToSend.getPriority());
>             template.sendBodyAndHeaders("jms:queue:test3", messageToSend,
> headers);
>         }
>
> The behavior I expect is the following:
>
> - Message 1 is pushed to the queue.
> - Message 1 is consumed by the thread #1.
> - Message 2 is pushed to the queue.
> - Message 2 is consumed by the thread #2 (because of priority based
> selector).
> - Message 3 is pushed to the queue.
> - Message 3 is consumed by the thread #1.
>
> Instead, the behavior I get is the following:
>
> - Message 1 is pushed to the queue.
> - Message 1 is consumed by the thread #1.
> - Message 2 is pushed to the queue.
> - Message 2 sits on the queue, not being consumed by any thread.
> - Message 3 is pushed to the queue.
> - Message 3 is consumed by the thread #1.
>
> The general overview of the situation is that only the messages matching
> the
> priority of the first message pushed to the queue will be processed,
> regardless of the order they're pushed in, while the messages with priority
> falling into the other side of the selector will not be consumed at all and
> will stay sitting on the queue.
>
> The version of AMQ broker I'm using is 5.10.
>
> So, does this seems to you like a bug or a feature I'm not able to
> understand? All input on the topic is highly welcome and appreciated!
>
> Regards,
> Frankie
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Selective-consuming-of-priority-messages-with-message-groups-tp4704813.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>

Re: Selective consuming of priority messages with message groups

Posted by Tim Bain <tb...@alumni.duke.edu>.
Also, if your consumers can't process all of the messages before the
low-priority ones age off, then you very clearly need more consumers, not
priority selectors.  Selecting on priority as you've done for the same
number of consumers will at best result in the same number of messages
consumed (and some messages aging off, some of which will now be
normal-priority messages, whereas without selectors they would only be
low-priority messages), and at worst you will have fewer messages consumed
(because there will probably be periods of time when there are no messages
of a given priority, when those consumers will sit idle).  If you can't
consume all your messages before messages age off, you need more consumers.

Tim
On Dec 15, 2015 6:58 AM, "Tim Bain" <tb...@alumni.duke.edu> wrote:

> You get all the things you described by continuing to use
> policy.setPrioritizedMessages(true) and not having selectors on priority.
> Normal-priority messages are still processed before low-priority messages
> when both are waiting, and messages in a group are still processed one at a
> time.  But you now have more consumers who can more quickly work on
> normal-priority messages to get through them quickly, and more consumers
> who can work on low-priority messages if you're lucky enough to get through
> all the normal-priority ones, since all consumers are now allowed to
> consume all messages.  Plus you no longer have the problem that made you
> ask for help in the first place, which is the goal anyway.
>
> What's the cost?  If there are no normal-priority messages in the queue
> (so all consumers are consuming low-priority messages) when a
> normal-priority message arrives, there is no longer a dedicated consumer
> sitting idle who can start processing it immediately, and you have to wait
> for the consumer for that message group to finish its current message
> before it picks up the new message.  But that seems like a perfectly
> acceptable downside in light of all the advantages, given that you haven't
> said that low-priority messages take an unreasonable amount of time to
> process, especially since your current implementation is broken anyway.
>
> Tim
> On Dec 14, 2015 11:34 AM, "artnaseef" <ar...@artnaseef.com> wrote:
>
>> To acheive the split between "high priority" and "low priority" without
>> starving the low-priority messages, I recommend:
>>
>> 1. use separate queues to split the priorities
>> 2. either use dedicated consumers for the low priorities that do not take
>> resources away from the higher priorities, or use some form of simple
>> heuristic to choose when to process the same (e.g. after processing 10
>> high-priority messages, process 1 low-priority message)
>>
>> The use of a heuristic will be important to allow the balance between
>> starving low-priority messages and delaying high-priority ones.
>>
>> Messaging features, such as priority queueing, are limited and not likely
>> to
>> give you the desired results.
>>
>> BTW, I may have missed it - why use message groups?  Is messaging order
>> important?
>>
>>
>>
>> --
>> View this message in context:
>> http://activemq.2283324.n4.nabble.com/Selective-consuming-of-priority-messages-with-message-groups-tp4704813p4704938.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>

Re: Selective consuming of priority messages with message groups

Posted by frankie_hr <fr...@2e-systems.com>.
Hi guys,

Thank you all for the suggestions; you sure taught me some new stuff ;-)

I'll take those as a reference and will try to work out a new model for
processing the messages in order to avoid the problem this whole discussion
was started because in the first place.

Cheers,
Frankie



--
View this message in context: http://activemq.2283324.n4.nabble.com/Selective-consuming-of-priority-messages-with-message-groups-tp4704813p4704975.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Selective consuming of priority messages with message groups

Posted by Tim Bain <tb...@alumni.duke.edu>.
You get all the things you described by continuing to use
policy.setPrioritizedMessages(true) and not having selectors on priority.
Normal-priority messages are still processed before low-priority messages
when both are waiting, and messages in a group are still processed one at a
time.  But you now have more consumers who can more quickly work on
normal-priority messages to get through them quickly, and more consumers
who can work on low-priority messages if you're lucky enough to get through
all the normal-priority ones, since all consumers are now allowed to
consume all messages.  Plus you no longer have the problem that made you
ask for help in the first place, which is the goal anyway.

What's the cost?  If there are no normal-priority messages in the queue (so
all consumers are consuming low-priority messages) when a normal-priority
message arrives, there is no longer a dedicated consumer sitting idle who
can start processing it immediately, and you have to wait for the consumer
for that message group to finish its current message before it picks up the
new message.  But that seems like a perfectly acceptable downside in light
of all the advantages, given that you haven't said that low-priority
messages take an unreasonable amount of time to process, especially since
your current implementation is broken anyway.

Tim
On Dec 14, 2015 11:34 AM, "artnaseef" <ar...@artnaseef.com> wrote:

> To acheive the split between "high priority" and "low priority" without
> starving the low-priority messages, I recommend:
>
> 1. use separate queues to split the priorities
> 2. either use dedicated consumers for the low priorities that do not take
> resources away from the higher priorities, or use some form of simple
> heuristic to choose when to process the same (e.g. after processing 10
> high-priority messages, process 1 low-priority message)
>
> The use of a heuristic will be important to allow the balance between
> starving low-priority messages and delaying high-priority ones.
>
> Messaging features, such as priority queueing, are limited and not likely
> to
> give you the desired results.
>
> BTW, I may have missed it - why use message groups?  Is messaging order
> important?
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Selective-consuming-of-priority-messages-with-message-groups-tp4704813p4704938.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>

Re: Selective consuming of priority messages with message groups

Posted by artnaseef <ar...@artnaseef.com>.
To acheive the split between "high priority" and "low priority" without
starving the low-priority messages, I recommend:

1. use separate queues to split the priorities
2. either use dedicated consumers for the low priorities that do not take
resources away from the higher priorities, or use some form of simple
heuristic to choose when to process the same (e.g. after processing 10
high-priority messages, process 1 low-priority message)

The use of a heuristic will be important to allow the balance between
starving low-priority messages and delaying high-priority ones.

Messaging features, such as priority queueing, are limited and not likely to
give you the desired results.

BTW, I may have missed it - why use message groups?  Is messaging order
important?



--
View this message in context: http://activemq.2283324.n4.nabble.com/Selective-consuming-of-priority-messages-with-message-groups-tp4704813p4704938.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Selective consuming of priority messages with message groups

Posted by frankie_hr <fr...@2e-systems.com>.
Hi Tim,

Thanks for the feedback.

The idea with the low and normal priority message separation arouse as a
consequence of the main business process in the application.

The application works in such a way that it is constantly fed by the normal
priority events, for which the processing time is of crucial importance. We
currently have a setting of 10 concurrent consumers on the main application
queue, which is just enough for the application to be able to chew all of
the events in time.

However, from time to time, we do receive a bulk of low important messages,
for which the processing time is not of such a great importance. By that, I
mean they should definitely yield to the normal priority ones, but should
not sit on the queue forever or until expired.

And this is exactly what happens: since the application is constantly under
the load of processing the normal important messages, whenever we get a bulk
of low priority ones, they very hardly seem to get the consumer to process
them, because all of the consumers are always busy with the normal priority
messages. This finally leads to a large portion of low priority messages
expiring from the queue after an hour.

As you remember me mentioning earlier, the grouping feature is also of great
importance, because all of the messages hold the information about a
resource that should never be accessed concurrently, which is why I was
trying to find a way to somehow give the low priority messages a chance to
be processed, without compromising the processing time of the normal
messages and while maintaining the exclusive access to the common resource.

I hope this makes it more clear now.

Do you have any ideas or suggestions on how to achieve that?

Regards,
Frankie



--
View this message in context: http://activemq.2283324.n4.nabble.com/Selective-consuming-of-priority-messages-with-message-groups-tp4704813p4704927.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Selective consuming of priority messages with message groups

Posted by Tim Bain <tb...@alumni.duke.edu>.
Also, prioritizedMessages=true is not required for the JMS grouping
feature, as documented at http://activemq.apache.org/message-groups.html.
On Dec 11, 2015 6:53 AM, "Tim Bain" <tb...@alumni.duke.edu> wrote:

> I don't see why you need one high-priority consumer and one
> medium-priority consumer.  Using prioritizedMessages=true already ensures
> that the broker will hand your consumers the highest priority messages
> first; just leave it at that.
>
> Also,  separating high and low processing isn't a requirement (or at
> least, it's a dumb requirement); the requirement should be to do something
> that is meaningful and useful to a user of the system, and separating high
> and low processing would then be the design chosen for how to meet the
> requirement.  I'm skeptical that you have a "real-world" requirement that
> is best met by separating high and low processing, and so I think the
> requirement you've been given is one that shouldn't be met.  But I'm
> certainly open to being proven wrong if there really is such a "real-world"
> requirement for your system...
> On Dec 11, 2015 12:51 AM, "frankie_hr" <fr...@2e-systems.com>
> wrote:
>
>> Hi Jose,
>>
>> I'm using the prioritizedMessages=true because of the JMS grouping
>> feature.
>>
>> In the application I'm running, one of the most important features is that
>> messages with the same group ID are not processed in parallel, thus the
>> message grouping ensuring the same consumer always gets the message with
>> the
>> same ID, so that they are processed sequentially.
>>
>> The most recent requirement was to additionally separate processing of
>> messages with the low and high priority, but by still keeping the message
>> grouping rule defined above. What I meant to achieve that way was for
>> different consumers to process low and high priority message, with the
>> same
>> group ID, but while keeping the sequential message processing order.
>>
>> Any other ideas on how to achieve that?
>>
>> One other idea of mine was to define a route which would say something
>> like
>> this:
>>
>>                 from("jms:queue:test?concurrentConsumers=1")
>>                     .choice()
>>                     .when(header("JMSPriority").isGreaterThanOrEqualTo(4))
>>                         .to("jms:queue:normalPriority")
>>                     .otherwise()
>>                         .to("jms:queue:lowPriority")
>>                     .endChoice();
>>
>> However, in that case, the messages consumed from the normalPriority and
>> lowPriority queues might be processed in parallel, which I'm trying to
>> avoid.
>>
>> Regards,
>> Frankie
>>
>>
>> Jose María Zaragoza wrote
>> > A question about your code:
>> >
>> > why do you configure prioritizedMessages= true on your broker and also
>> > use consumers with a selector parameter ?
>> > I guess that to use consumers with a selector parameter is for
>> > avoiding to set prioritizedMessages= true
>> >
>> > With prioritizedMessages= true , the broker try to deliver the
>> > messages in order by priority
>> >
>> > It's only curiosity
>> >
>> > Regards
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://activemq.2283324.n4.nabble.com/Selective-consuming-of-priority-messages-with-message-groups-tp4704813p4704881.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>

Re: Selective consuming of priority messages with message groups

Posted by Tim Bain <tb...@alumni.duke.edu>.
I don't see why you need one high-priority consumer and one medium-priority
consumer.  Using prioritizedMessages=true already ensures that the broker
will hand your consumers the highest priority messages first; just leave it
at that.

Also,  separating high and low processing isn't a requirement (or at least,
it's a dumb requirement); the requirement should be to do something that is
meaningful and useful to a user of the system, and separating high and low
processing would then be the design chosen for how to meet the
requirement.  I'm skeptical that you have a "real-world" requirement that
is best met by separating high and low processing, and so I think the
requirement you've been given is one that shouldn't be met.  But I'm
certainly open to being proven wrong if there really is such a "real-world"
requirement for your system...
On Dec 11, 2015 12:51 AM, "frankie_hr" <fr...@2e-systems.com>
wrote:

> Hi Jose,
>
> I'm using the prioritizedMessages=true because of the JMS grouping feature.
>
> In the application I'm running, one of the most important features is that
> messages with the same group ID are not processed in parallel, thus the
> message grouping ensuring the same consumer always gets the message with
> the
> same ID, so that they are processed sequentially.
>
> The most recent requirement was to additionally separate processing of
> messages with the low and high priority, but by still keeping the message
> grouping rule defined above. What I meant to achieve that way was for
> different consumers to process low and high priority message, with the same
> group ID, but while keeping the sequential message processing order.
>
> Any other ideas on how to achieve that?
>
> One other idea of mine was to define a route which would say something like
> this:
>
>                 from("jms:queue:test?concurrentConsumers=1")
>                     .choice()
>                     .when(header("JMSPriority").isGreaterThanOrEqualTo(4))
>                         .to("jms:queue:normalPriority")
>                     .otherwise()
>                         .to("jms:queue:lowPriority")
>                     .endChoice();
>
> However, in that case, the messages consumed from the normalPriority and
> lowPriority queues might be processed in parallel, which I'm trying to
> avoid.
>
> Regards,
> Frankie
>
>
> Jose María Zaragoza wrote
> > A question about your code:
> >
> > why do you configure prioritizedMessages= true on your broker and also
> > use consumers with a selector parameter ?
> > I guess that to use consumers with a selector parameter is for
> > avoiding to set prioritizedMessages= true
> >
> > With prioritizedMessages= true , the broker try to deliver the
> > messages in order by priority
> >
> > It's only curiosity
> >
> > Regards
>
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Selective-consuming-of-priority-messages-with-message-groups-tp4704813p4704881.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>

Re: Selective consuming of priority messages with message groups

Posted by frankie_hr <fr...@2e-systems.com>.
Hi Jose,

I'm using the prioritizedMessages=true because of the JMS grouping feature. 

In the application I'm running, one of the most important features is that
messages with the same group ID are not processed in parallel, thus the
message grouping ensuring the same consumer always gets the message with the
same ID, so that they are processed sequentially.

The most recent requirement was to additionally separate processing of
messages with the low and high priority, but by still keeping the message
grouping rule defined above. What I meant to achieve that way was for
different consumers to process low and high priority message, with the same
group ID, but while keeping the sequential message processing order.

Any other ideas on how to achieve that?

One other idea of mine was to define a route which would say something like
this:

                from("jms:queue:test?concurrentConsumers=1")
                    .choice()
                    .when(header("JMSPriority").isGreaterThanOrEqualTo(4))
                        .to("jms:queue:normalPriority")
                    .otherwise()
                        .to("jms:queue:lowPriority")
                    .endChoice();

However, in that case, the messages consumed from the normalPriority and
lowPriority queues might be processed in parallel, which I'm trying to
avoid.

Regards,
Frankie


Jose María Zaragoza wrote
> A question about your code:
> 
> why do you configure prioritizedMessages= true on your broker and also
> use consumers with a selector parameter ?
> I guess that to use consumers with a selector parameter is for
> avoiding to set prioritizedMessages= true
> 
> With prioritizedMessages= true , the broker try to deliver the
> messages in order by priority
> 
> It's only curiosity
> 
> Regards





--
View this message in context: http://activemq.2283324.n4.nabble.com/Selective-consuming-of-priority-messages-with-message-groups-tp4704813p4704881.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Selective consuming of priority messages with message groups

Posted by Jose María Zaragoza <de...@gmail.com>.
2015-12-09 13:55 GMT+01:00 frankie_hr <fr...@2e-systems.com>:
> Hi All,
>
> I'm having issues with a little piece of the code which is supposed to
> selectively consume the messages from a queue, using the message priority as
> the selector value. In addition to that, the grouping of messages is also
> being used.
>
> The prioritization policy is defined for the broker:
>
>         PolicyEntry policy = new PolicyEntry();
>         policy.setQueue(">");
>         policy.setPrioritizedMessages(true);
>
> The route is as follows:
>
>         from("jms:queue:test?concurrentConsumers=1&selector=JMSPriority>=4")
>             .bean(Processor.class, "process")
>             .to("mock:result");
>
>         from("jms:queue:test?concurrentConsumers=1&selector=JMSPriority<4")
>             .bean(Processor.class, "process")
>             .to("mock:result");
>
> The test messages are pushed to the queue as follows:
>
>         List<Message> messagesToSend = new ArrayList<>();
>         Message message;
>
>         message = new Message(1);
>         message.setGroup("1");
>         message.setPriority(0);
>         messagesToSend.add(message);
>
>         message = new Message(2);
>         message.setGroup("1");
>         message.setPriority(9);
>         messagesToSend.add(message);
>
>         message = new Message(3);
>         message.setGroup("1");
>         message.setPriority(0);
>         messagesToSend.add(message);
>
>         for (Message messageToSend : messagesToSend) {
>
>             Map<String, Object> headers = new HashMap<String, Object>();
>             headers.put(JMS_GROUP_ID_HEADER, messageToSend.getGroup());
>
>
>             headers.put(JMS_PRIORITY_HEADER, messageToSend.getPriority());
>             template.sendBodyAndHeaders("jms:queue:test3", messageToSend,
> headers);
>         }
>

A question about your code:

why do you configure prioritizedMessages= true on your broker and also
use consumers with a selector parameter ?
I guess that to use consumers with a selector parameter is for
avoiding to set prioritizedMessages= true

With prioritizedMessages= true , the broker try to deliver the
messages in order by priority

It's only curiosity

Regards

> The behavior I expect is the following:
>
> - Message 1 is pushed to the queue.
> - Message 1 is consumed by the thread #1.
> - Message 2 is pushed to the queue.
> - Message 2 is consumed by the thread #2 (because of priority based
> selector).
> - Message 3 is pushed to the queue.
> - Message 3 is consumed by the thread #1.
>
> Instead, the behavior I get is the following:
>
> - Message 1 is pushed to the queue.
> - Message 1 is consumed by the thread #1.
> - Message 2 is pushed to the queue.
> - Message 2 sits on the queue, not being consumed by any thread.
> - Message 3 is pushed to the queue.
> - Message 3 is consumed by the thread #1.
>
> The general overview of the situation is that only the messages matching the
> priority of the first message pushed to the queue will be processed,
> regardless of the order they're pushed in, while the messages with priority
> falling into the other side of the selector will not be consumed at all and
> will stay sitting on the queue.
>
> The version of AMQ broker I'm using is 5.10.
>
> So, does this seems to you like a bug or a feature I'm not able to
> understand? All input on the topic is highly welcome and appreciated!
>
> Regards,
> Frankie
>
>
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/Selective-consuming-of-priority-messages-with-message-groups-tp4704813.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.