You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Mihai Osian <Mi...@gmail.com> on 2011/11/14 18:33:01 UTC

Message priority

  Hello, 

  I am trying to enable priority consumption and I can't seem to get it
working right. I am using ActiveMQ version 5.5.1 and I have followed the
instructions from
http://activemq.apache.org/how-can-i-support-priority-queues.html. 
  For testing I modified the example code that comes with ActiveMQ itself so
that the ProducerTool sends 180 messages with cycling priority
(0,1,..8,0,1,2...). After the producer finishes the job I start the consumer
and look at the order in which the messages are received. The first few
messages are sometimes received in an apparently random order, after which
they start coming in descending order of their priority, as I expect. 
  I have also discovered that if I set queuePrefetch="1" in the
"policyEntry" element from the XML configuration file I get an unsorted
list. Similarly, setting it to 3 (for example) results in the output list
being thoroughly randomized (messages are presumably sorted in groups of 3,
but not quite). 

  My questions are: 
1. Can somebody explain the results I see ? Am I missing something ? 
2. Is there a way to get the all messages fully sorted in descending order,
assuming one single consumer that is started after all messages are in the
queue ? 
3. The ActiveMQ documentation page (written for version 5.4) that I
mentioned above says that KahaDB supports 3 levels of priority (<5, 5, and
>5). However:
   3a. The first few messages that I get are sometimes low-priority (<5).
   3b. With a larger queuePrefetch most messages are fully sorted in
descending order, so it may be that the page is out of date and in the
latest version it is supposed to support the full priority range. Still, I
don't understand the output.

Other technical information: 
- ActiveMQ 5.5.1 (also tested 5.5.0, same behavior)
- activemq started from the command line with: 
   ./bin/activemq console xbean:conf/activemq-priority.xml
The activemq-priority.xml is a copy of the conf/activemq.xml, plus the
following entry: 
  <policyEntry queue="TEST.FOO" prioritizedMessages="true"
queuePrefetch="200" producerFlowControl="true"
memoryLimit="1mb"></policyEntry> 
- A modified version of the ActiveMQ example (also containing the
activemq-priority.xml file) can be found here: 
http://www.mediafire.com/?z7u5fhhlfbt3ano , if anybody wants to check the
code. 

=========================================
An example output copy-pasted from my console: 

mike@dell:~/tmp/apache-activemq-5.5.0/example$ ant producer
Buildfile: /home/mike/tmp/apache-activemq-5.5.0/example/build.xml

init:

compile:
    [javac] /home/mike/tmp/apache-activemq-5.5.0/example/build.xml:152:
warning: 'includeantruntime' was not set, defaulting to
build.sysclasspath=last; set to false for repeatable builds

producer:
     [echo] Running producer against server at $url = tcp://localhost:61616
for subject $subject = TEST.FOO
     [java] Connecting to URL: tcp://localhost:61616
     [java] Publishing a Message with size 1000 to queue: TEST.FOO
     [java] Using non-persistent messages
     [java] Sleeping between publish 0 ms
     [java] Running 1 parallel threads
     [java] [Thread-1] Sending message: 'Message with priority: 0 index: 0
sent at: Mon Nov...' with priority 0
     [java] [Thread-1] Sending message: 'Message with priority: 1 index: 1
sent at: Mon Nov...' with priority 1
     [java] [Thread-1] Sending message: 'Message with priority: 2 index: 2
sent at: Mon Nov...' with priority 2
     [java] [Thread-1] Sending message: 'Message with priority: 3 index: 3
sent at: Mon Nov...' with priority 3
     [java] [Thread-1] Sending message: 'Message with priority: 4 index: 4
sent at: Mon Nov...' with priority 4
     [java] [Thread-1] Sending message: 'Message with priority: 5 index: 5
sent at: Mon Nov...' with priority 5
     [java] [Thread-1] Sending message: 'Message with priority: 6 index: 6
sent at: Mon Nov...' with priority 6
     [java] [Thread-1] Sending message: 'Message with priority: 7 index: 7
sent at: Mon Nov...' with priority 7
     [java] [Thread-1] Sending message: 'Message with priority: 8 index: 8
sent at: Mon Nov...' with priority 8
     [java] [Thread-1] Sending message: 'Message with priority: 0 index: 9
sent at: Mon Nov...' with priority 0
     [java] [Thread-1] Sending message: 'Message with priority: 1 index: 10
sent at: Mon No...' with priority 1
     [java] [Thread-1] Sending message: 'Message with priority: 2 index: 11
sent at: Mon No...' with priority 2
     [java] [Thread-1] Sending message: 'Message with priority: 3 index: 12
sent at: Mon No...' with priority 3
     [java] [Thread-1] Sending message: 'Message with priority: 4 index: 13
sent at: Mon No...' with priority 4
     [java] [Thread-1] Sending message: 'Message with priority: 5 index: 14
sent at: Mon No...' with priority 5
     [java] [Thread-1] Sending message: 'Message with priority: 6 index: 15
sent at: Mon No...' with priority 6
    ...

mike@dell:~/tmp/apache-activemq-5.5.0/example$ ant consumer 
Buildfile: /home/mike/tmp/apache-activemq-5.5.0/example/build.xml

init:

compile:
    [javac] /home/mike/tmp/apache-activemq-5.5.0/example/build.xml:152:
warning: 'includeantruntime' was not set, defaulting to
build.sysclasspath=last; set to false for repeatable builds

consumer:
     [echo] Running consumer against server at $url = tcp://localhost:61616
for subject $subject = TEST.FOO
     [java] Connecting to URL: tcp://localhost:61616
     [java] Consuming queue: TEST.FOO
     [java] Using a non-durable subscription
     [java] JMS Selector 
     [java] Running 1 parallel threads
     [java] [Thread-1] We are about to wait until we consume: 180 message(s)
then we will shutdown
     [java] [Thread-1] Received: 'Message with priority: 7 index: 7 sent at:
Mon Nov...' (length 1000)
     [java] [Thread-1] Received: 'Message with priority: 8 index: 8 sent at:
Mon Nov...' (length 1000)
     [java] [Thread-1] Received: 'Message with priority: 8 index: 17 sent
at: Mon No...' (length 1000)
     [java] [Thread-1] Received: 'Message with priority: 7 index: 16 sent
at: Mon No...' (length 1000)
     [java] [Thread-1] Received: 'Message with priority: 6 index: 6 sent at:
Mon Nov...' (length 1000)
     [java] [Thread-1] Received: 'Message with priority: 6 index: 15 sent
at: Mon No...' (length 1000)
     [java] [Thread-1] Received: 'Message with priority: 5 index: 5 sent at:
Mon Nov...' (length 1000)
    ....
======================

Thanks in advance, 
Mihai

http://activemq.2283324.n4.nabble.com/file/n4040260/example.tgz example.tgz 


--
View this message in context: http://activemq.2283324.n4.nabble.com/Message-priority-tp4040260p4040260.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Message priority

Posted by Gary Tully <ga...@gmail.com>.
hmm. that is a problem. If you use persistent delivery,
DeliveryMode.PERSISTENT or pass -Ddurable=true to the ant producer
task, the priority order will be respected.

I ran a quick variant of the
org.apache.activemq.store.MessagePriorityTest.testQueues from
activemq-core with non persistent delivery and it fails.
Can you raise a jira issue to track this. It needs a bit of
investigation to see why sorting is being bypassed for non persistent
messages.

On 14 November 2011 17:33, Mihai Osian <Mi...@gmail.com> wrote:
>  Hello,
>
>  I am trying to enable priority consumption and I can't seem to get it
> working right. I am using ActiveMQ version 5.5.1 and I have followed the
> instructions from
> http://activemq.apache.org/how-can-i-support-priority-queues.html.
>  For testing I modified the example code that comes with ActiveMQ itself so
> that the ProducerTool sends 180 messages with cycling priority
> (0,1,..8,0,1,2...). After the producer finishes the job I start the consumer
> and look at the order in which the messages are received. The first few
> messages are sometimes received in an apparently random order, after which
> they start coming in descending order of their priority, as I expect.
>  I have also discovered that if I set queuePrefetch="1" in the
> "policyEntry" element from the XML configuration file I get an unsorted
> list. Similarly, setting it to 3 (for example) results in the output list
> being thoroughly randomized (messages are presumably sorted in groups of 3,
> but not quite).
>
>  My questions are:
> 1. Can somebody explain the results I see ? Am I missing something ?
> 2. Is there a way to get the all messages fully sorted in descending order,
> assuming one single consumer that is started after all messages are in the
> queue ?
> 3. The ActiveMQ documentation page (written for version 5.4) that I
> mentioned above says that KahaDB supports 3 levels of priority (<5, 5, and
>>5). However:
>   3a. The first few messages that I get are sometimes low-priority (<5).
>   3b. With a larger queuePrefetch most messages are fully sorted in
> descending order, so it may be that the page is out of date and in the
> latest version it is supposed to support the full priority range. Still, I
> don't understand the output.
>
> Other technical information:
> - ActiveMQ 5.5.1 (also tested 5.5.0, same behavior)
> - activemq started from the command line with:
>   ./bin/activemq console xbean:conf/activemq-priority.xml
> The activemq-priority.xml is a copy of the conf/activemq.xml, plus the
> following entry:
>  <policyEntry queue="TEST.FOO" prioritizedMessages="true"
> queuePrefetch="200" producerFlowControl="true"
> memoryLimit="1mb"></policyEntry>
> - A modified version of the ActiveMQ example (also containing the
> activemq-priority.xml file) can be found here:
> http://www.mediafire.com/?z7u5fhhlfbt3ano , if anybody wants to check the
> code.
>
> =========================================
> An example output copy-pasted from my console:
>
> mike@dell:~/tmp/apache-activemq-5.5.0/example$ ant producer
> Buildfile: /home/mike/tmp/apache-activemq-5.5.0/example/build.xml
>
> init:
>
> compile:
>    [javac] /home/mike/tmp/apache-activemq-5.5.0/example/build.xml:152:
> warning: 'includeantruntime' was not set, defaulting to
> build.sysclasspath=last; set to false for repeatable builds
>
> producer:
>     [echo] Running producer against server at $url = tcp://localhost:61616
> for subject $subject = TEST.FOO
>     [java] Connecting to URL: tcp://localhost:61616
>     [java] Publishing a Message with size 1000 to queue: TEST.FOO
>     [java] Using non-persistent messages
>     [java] Sleeping between publish 0 ms
>     [java] Running 1 parallel threads
>     [java] [Thread-1] Sending message: 'Message with priority: 0 index: 0
> sent at: Mon Nov...' with priority 0
>     [java] [Thread-1] Sending message: 'Message with priority: 1 index: 1
> sent at: Mon Nov...' with priority 1
>     [java] [Thread-1] Sending message: 'Message with priority: 2 index: 2
> sent at: Mon Nov...' with priority 2
>     [java] [Thread-1] Sending message: 'Message with priority: 3 index: 3
> sent at: Mon Nov...' with priority 3
>     [java] [Thread-1] Sending message: 'Message with priority: 4 index: 4
> sent at: Mon Nov...' with priority 4
>     [java] [Thread-1] Sending message: 'Message with priority: 5 index: 5
> sent at: Mon Nov...' with priority 5
>     [java] [Thread-1] Sending message: 'Message with priority: 6 index: 6
> sent at: Mon Nov...' with priority 6
>     [java] [Thread-1] Sending message: 'Message with priority: 7 index: 7
> sent at: Mon Nov...' with priority 7
>     [java] [Thread-1] Sending message: 'Message with priority: 8 index: 8
> sent at: Mon Nov...' with priority 8
>     [java] [Thread-1] Sending message: 'Message with priority: 0 index: 9
> sent at: Mon Nov...' with priority 0
>     [java] [Thread-1] Sending message: 'Message with priority: 1 index: 10
> sent at: Mon No...' with priority 1
>     [java] [Thread-1] Sending message: 'Message with priority: 2 index: 11
> sent at: Mon No...' with priority 2
>     [java] [Thread-1] Sending message: 'Message with priority: 3 index: 12
> sent at: Mon No...' with priority 3
>     [java] [Thread-1] Sending message: 'Message with priority: 4 index: 13
> sent at: Mon No...' with priority 4
>     [java] [Thread-1] Sending message: 'Message with priority: 5 index: 14
> sent at: Mon No...' with priority 5
>     [java] [Thread-1] Sending message: 'Message with priority: 6 index: 15
> sent at: Mon No...' with priority 6
>    ...
>
> mike@dell:~/tmp/apache-activemq-5.5.0/example$ ant consumer
> Buildfile: /home/mike/tmp/apache-activemq-5.5.0/example/build.xml
>
> init:
>
> compile:
>    [javac] /home/mike/tmp/apache-activemq-5.5.0/example/build.xml:152:
> warning: 'includeantruntime' was not set, defaulting to
> build.sysclasspath=last; set to false for repeatable builds
>
> consumer:
>     [echo] Running consumer against server at $url = tcp://localhost:61616
> for subject $subject = TEST.FOO
>     [java] Connecting to URL: tcp://localhost:61616
>     [java] Consuming queue: TEST.FOO
>     [java] Using a non-durable subscription
>     [java] JMS Selector
>     [java] Running 1 parallel threads
>     [java] [Thread-1] We are about to wait until we consume: 180 message(s)
> then we will shutdown
>     [java] [Thread-1] Received: 'Message with priority: 7 index: 7 sent at:
> Mon Nov...' (length 1000)
>     [java] [Thread-1] Received: 'Message with priority: 8 index: 8 sent at:
> Mon Nov...' (length 1000)
>     [java] [Thread-1] Received: 'Message with priority: 8 index: 17 sent
> at: Mon No...' (length 1000)
>     [java] [Thread-1] Received: 'Message with priority: 7 index: 16 sent
> at: Mon No...' (length 1000)
>     [java] [Thread-1] Received: 'Message with priority: 6 index: 6 sent at:
> Mon Nov...' (length 1000)
>     [java] [Thread-1] Received: 'Message with priority: 6 index: 15 sent
> at: Mon No...' (length 1000)
>     [java] [Thread-1] Received: 'Message with priority: 5 index: 5 sent at:
> Mon Nov...' (length 1000)
>    ....
> ======================
>
> Thanks in advance,
> Mihai
>
> http://activemq.2283324.n4.nabble.com/file/n4040260/example.tgz example.tgz
>
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/Message-priority-tp4040260p4040260.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
http://fusesource.com
http://blog.garytully.com

Re: Message priority

Posted by metatech <me...@gmail.com>.
gtully wrote
> policyEntry queue="xxx" useCache="false"
> expireMessagesPeriod="0"queuePrefetch="1"

Gary,Even tough we configured our queue using all these options, we still
have problems when a JDBC persistence adapter is used : 1. sometimes low
priority messages are consumed although there are thousands of high priority
messages in the queue.2. sometimes high priority message consumption is
"frozen" until broker is restarted.We created a JIRA ticket (AMQ-4489) with
unit test drivers which reproduce these problems.Could you have a look
please ?Thanks in advance,metatech



--
View this message in context: http://activemq.2283324.n4.nabble.com/Message-priority-tp4040260p4666385.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply:Re: Reply:Re: Message priority

Posted by SuoNayi <su...@163.com>.
Hi, do you get it work?
You need comment out the vmcursor in the queue policy and the whole policy for the queue
looks like this,
<policyEntry queue="YOUR_prioritized_QUEUE_NAME" producerFlowControl="false" 
prioritizedMessages="true" useCache="false" queuePrefetch="1" expireMessagesPeriod="0"/>
I have tested it and it does work but performance penalty becomes arrestive.


At 2013-01-07 22:07:11,"Gary Tully" <ga...@gmail.com> wrote:
>your important message is in the store, but not in the current memory
>snapshot.
>Unless we keep all messages in memory (vmcursor), we need to carefully
>configure the store cursors use of memory.
>
>There are two problems here.
>
>The first is caching messages on send by the store cursor.
>If there is a large batch of low priority messages that cause the cache to
>be exhausted, followed by some high priority messages, the high priority
>messages will only be read and dispatched when the cache is depleted.
>Currently it is not possible for a cached low priority message in memory to
>be replaced by a high priority message.
>The workaround for this is to disable the cursor cache.
><policyEntry queue="xxx" useCache="false" ..>
>
>The second is paging into memory from the store, to ensure true priority
>order this must occur only for dispatch to consumers. By default there is a
>periodic message scan to check for priority. This must be disabled,
>otherwise a consumer will get a view of the store order as seen by the last
>scan rather than at the time it subscribes.
>
>The workaround for this is to disable async expiry processing:
><policyEntry queue="xxx" useCache="false" expireMessagesPeriod="0" >
>
>Lastly, if high priory messages are a must have for each consumer, using
>prefetch=1 is necessary, such that every consumer dispatch goes to the
>store to find the current highest priority message. Otherwise a batch of
>priority ordered messages will be in memory ready to dispatch, irrespective
>of new deliveries.
>
><policyEntry queue="xxx" useCache="false" expireMessagesPeriod="0"
>queuePrefetch="1" >
>
>It is a trade off between taking a snapshot and caching or getting the
>latest up-to-date information w.r.t to producers.
>
>For active durable topic subscribers we support immediate priority
>dispatch, where we bypass the cache if we have a higher priority message
>and dispatch it immediately to all subscribers.
>We may need to investigate a similar arrangement for queues.
>
>
>On 7 January 2013 07:40, xawiers <xa...@gmail.com> wrote:
>
>> According to SuoNayi, amq will respect the message priority order if
>> vmcursor
>> is used.
>> If I will use default cursor - I will not get message priority. And this is
>> bad. Message priority is very important to me.
>> I have tried with storage cursor also:
>> One producer, one consumer
>>
>> added 10 messages with priority 3
>> added 3 messages with priority 5
>> added 100000 messages with priority 3
>> added 3 messages with priority 5
>>
>> consumed 3 messages (all priority 5), consuming 4th message I got message
>> with priority 3. Where is my important message ?
>>
>>
>>
>> --
>> View this message in context:
>> http://activemq.2283324.n4.nabble.com/Message-priority-tp4040260p4661442.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>
>
>
>-- 
>http://redhat.com
>http://blog.garytully.com

Re: Reply:Re: Message priority

Posted by Gary Tully <ga...@gmail.com>.
your important message is in the store, but not in the current memory
snapshot.
Unless we keep all messages in memory (vmcursor), we need to carefully
configure the store cursors use of memory.

There are two problems here.

The first is caching messages on send by the store cursor.
If there is a large batch of low priority messages that cause the cache to
be exhausted, followed by some high priority messages, the high priority
messages will only be read and dispatched when the cache is depleted.
Currently it is not possible for a cached low priority message in memory to
be replaced by a high priority message.
The workaround for this is to disable the cursor cache.
<policyEntry queue="xxx" useCache="false" ..>

The second is paging into memory from the store, to ensure true priority
order this must occur only for dispatch to consumers. By default there is a
periodic message scan to check for priority. This must be disabled,
otherwise a consumer will get a view of the store order as seen by the last
scan rather than at the time it subscribes.

The workaround for this is to disable async expiry processing:
<policyEntry queue="xxx" useCache="false" expireMessagesPeriod="0" >

Lastly, if high priory messages are a must have for each consumer, using
prefetch=1 is necessary, such that every consumer dispatch goes to the
store to find the current highest priority message. Otherwise a batch of
priority ordered messages will be in memory ready to dispatch, irrespective
of new deliveries.

<policyEntry queue="xxx" useCache="false" expireMessagesPeriod="0"
queuePrefetch="1" >

It is a trade off between taking a snapshot and caching or getting the
latest up-to-date information w.r.t to producers.

For active durable topic subscribers we support immediate priority
dispatch, where we bypass the cache if we have a higher priority message
and dispatch it immediately to all subscribers.
We may need to investigate a similar arrangement for queues.


On 7 January 2013 07:40, xawiers <xa...@gmail.com> wrote:

> According to SuoNayi, amq will respect the message priority order if
> vmcursor
> is used.
> If I will use default cursor - I will not get message priority. And this is
> bad. Message priority is very important to me.
> I have tried with storage cursor also:
> One producer, one consumer
>
> added 10 messages with priority 3
> added 3 messages with priority 5
> added 100000 messages with priority 3
> added 3 messages with priority 5
>
> consumed 3 messages (all priority 5), consuming 4th message I got message
> with priority 3. Where is my important message ?
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Message-priority-tp4040260p4661442.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
http://redhat.com
http://blog.garytully.com

Re: Reply:Re: Message priority

Posted by xawiers <xa...@gmail.com>.
On Mon, Jan 7, 2013 at 9:40 AM, xawiers [via ActiveMQ]
<ml...@n4.nabble.com> wrote:
> According to SuoNayi, amq will respect the message priority order if
> vmcursor is used.
> If I will use default cursor - I will not get message priority. And this is
> bad. Message priority is very important to me.
> I have tried with storage cursor also:
> One producer, one consumer
>
> added 10 messages with priority 3
> added 3 messages with priority 5
> added 100000 messages with priority 3
> added 3 messages with priority 5
>
> consumed 3 messages (all priority 5), consuming 4th message I got message
> with priority 3. Where is my important message ?
>
> ________________________________
> If you reply to this email, your message will be added to the discussion
> below:
> http://activemq.2283324.n4.nabble.com/Message-priority-tp4040260p4661442.html
> To unsubscribe from Message priority, click here.
> NAML




--
View this message in context: http://activemq.2283324.n4.nabble.com/Message-priority-tp4040260p4661443.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Reply:Re: Reply:Re: Message priority

Posted by xawiers <xa...@gmail.com>.
My Config does not use cache and set prefetch size to zero (found on other
forums as recommendations)
I don't use topics, so don't play attention to it.


I also searched how to minimize batch message size to keep memory cursor
small, have not found working results. In my case receiving importand
message is critical and it can't wait till next batch query.
amq could check if produced message has higher priority than in cursor's
message list (let's say last message, because it should be kept in priority
order) and if it's true - add it and remove last in queue (which will be
with lower priority). This is just my thoughts.
So what settings could be best ?
Add memory limit on queue?
How to reduce cursor's waiting message amount size?
Performance is not very much important at this point. Consumer is slower
than amq.



--
View this message in context: http://activemq.2283324.n4.nabble.com/Message-priority-tp4040260p4661453.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply:Re: Reply:Re: Message priority

Posted by SuoNayi <su...@163.com>.
Well,let's explore the message priority mechanism inside the broker.
The message sent from a producer will be persistent in the message store at first,
then it will be cached in the message cursor if the memory for the cursor is available.
That cursor is basically a priority list which supports sorting messages by priority.
When messages in the message cursor are all consumed by consumers the broker will
read the next batch messages by message priority from the store.
So if the space of  the message cursor is very small or disabling the cache capability 
of the message cursor the cursor will not be able to cache messages and the broker 
will always read the next batch messages by message priority from the store.
In this case the broker behaves as your issuing the statement as the following in the database,
SELECT * FROM MESSAGE_TABLE ORDER BY priority.
Of course this approach pays some price on peformance penalty or you can consider 
using different queues to deliver different priority messages.



At 2013-01-07 15:40:13,xawiers <xa...@gmail.com> wrote:
>According to SuoNayi, amq will respect the message priority order if vmcursor
>is used.
>If I will use default cursor - I will not get message priority. And this is
>bad. Message priority is very important to me.
>I have tried with storage cursor also:
>One producer, one consumer
>
>added 10 messages with priority 3
>added 3 messages with priority 5
>added 100000 messages with priority 3
>added 3 messages with priority 5
>
>consumed 3 messages (all priority 5), consuming 4th message I got message
>with priority 3. Where is my important message ?
>
>
>
>--
>View this message in context: http://activemq.2283324.n4.nabble.com/Message-priority-tp4040260p4661442.html
>Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Reply:Re: Message priority

Posted by xawiers <xa...@gmail.com>.
According to SuoNayi, amq will respect the message priority order if vmcursor
is used.
If I will use default cursor - I will not get message priority. And this is
bad. Message priority is very important to me.
I have tried with storage cursor also:
One producer, one consumer

added 10 messages with priority 3
added 3 messages with priority 5
added 100000 messages with priority 3
added 3 messages with priority 5

consumed 3 messages (all priority 5), consuming 4th message I got message
with priority 3. Where is my important message ?



--
View this message in context: http://activemq.2283324.n4.nabble.com/Message-priority-tp4040260p4661442.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Reply:Re: Message priority

Posted by Gary Tully <ga...@gmail.com>.
use the default (store) cursor. It will just buffer messages up to 70% of
the memory limit so there is no need to block.



On 2 January 2013 18:43, xawiers <xa...@gmail.com> wrote:

> Anyway. Tested with 5.7.0 and 5.5.1:
> if define vmcursor and lower memoryUsage (limit 64mb)
> then memory fills up adding more and more messages and then producer is
> stopped.
>
> is here any setting to have not big memory usage and small bach size to
> fetch messages from persistence DB (I use mysql) and preserve priorities?
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Message-priority-tp4040260p4661227.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
http://redhat.com
http://blog.garytully.com

Re: Reply:Re: Message priority

Posted by xawiers <xa...@gmail.com>.
Anyway. Tested with 5.7.0 and 5.5.1:
if define vmcursor and lower memoryUsage (limit 64mb)
then memory fills up adding more and more messages and then producer is
stopped.

is here any setting to have not big memory usage and small bach size to
fetch messages from persistence DB (I use mysql) and preserve priorities?



--
View this message in context: http://activemq.2283324.n4.nabble.com/Message-priority-tp4040260p4661227.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply:Re: Message priority

Posted by SuoNayi <su...@163.com>.
amq will respect the message priority order if vmcursor is used.
on the other hand  it will not completely respect that order as default messagecursor is used and consumers are slower than producer,because message are read from the store and  ordered and at last sent to consumers in batches.
I  donot know whether this is correct and expect Gary show the way.




At 2011-11-15 20:24:01,"Gary Tully" <ga...@gmail.com> wrote:
>The fix will be in 5.6, have a peek at
>https://issues.apache.org/jira/browse/AMQ-3596, you can validate it in
>tonight's 5.6-SNAPSHOT or try one of the workarounds.
>
>On 14 November 2011 17:33, Mihai Osian <Mi...@gmail.com> wrote:
>>  Hello,
>>
>>  I am trying to enable priority consumption and I can't seem to get it
>> working right. I am using ActiveMQ version 5.5.1 and I have followed the
>> instructions from
>> http://activemq.apache.org/how-can-i-support-priority-queues.html.
>>  For testing I modified the example code that comes with ActiveMQ itself so
>> that the ProducerTool sends 180 messages with cycling priority
>> (0,1,..8,0,1,2...). After the producer finishes the job I start the consumer
>> and look at the order in which the messages are received. The first few
>> messages are sometimes received in an apparently random order, after which
>> they start coming in descending order of their priority, as I expect.
>>  I have also discovered that if I set queuePrefetch="1" in the
>> "policyEntry" element from the XML configuration file I get an unsorted
>> list. Similarly, setting it to 3 (for example) results in the output list
>> being thoroughly randomized (messages are presumably sorted in groups of 3,
>> but not quite).
>>
>>  My questions are:
>> 1. Can somebody explain the results I see ? Am I missing something ?
>> 2. Is there a way to get the all messages fully sorted in descending order,
>> assuming one single consumer that is started after all messages are in the
>> queue ?
>> 3. The ActiveMQ documentation page (written for version 5.4) that I
>> mentioned above says that KahaDB supports 3 levels of priority (<5, 5, and
>>>5). However:
>>   3a. The first few messages that I get are sometimes low-priority (<5).
>>   3b. With a larger queuePrefetch most messages are fully sorted in
>> descending order, so it may be that the page is out of date and in the
>> latest version it is supposed to support the full priority range. Still, I
>> don't understand the output.
>>
>> Other technical information:
>> - ActiveMQ 5.5.1 (also tested 5.5.0, same behavior)
>> - activemq started from the command line with:
>>   ./bin/activemq console xbean:conf/activemq-priority.xml
>> The activemq-priority.xml is a copy of the conf/activemq.xml, plus the
>> following entry:
>>  <policyEntry queue="TEST.FOO" prioritizedMessages="true"
>> queuePrefetch="200" producerFlowControl="true"
>> memoryLimit="1mb"></policyEntry>
>> - A modified version of the ActiveMQ example (also containing the
>> activemq-priority.xml file) can be found here:
>> http://www.mediafire.com/?z7u5fhhlfbt3ano , if anybody wants to check the
>> code.
>>
>> =========================================
>> An example output copy-pasted from my console:
>>
>> mike@dell:~/tmp/apache-activemq-5.5.0/example$ ant producer
>> Buildfile: /home/mike/tmp/apache-activemq-5.5.0/example/build.xml
>>
>> init:
>>
>> compile:
>>    [javac] /home/mike/tmp/apache-activemq-5.5.0/example/build.xml:152:
>> warning: 'includeantruntime' was not set, defaulting to
>> build.sysclasspath=last; set to false for repeatable builds
>>
>> producer:
>>     [echo] Running producer against server at $url = tcp://localhost:61616
>> for subject $subject = TEST.FOO
>>     [java] Connecting to URL: tcp://localhost:61616
>>     [java] Publishing a Message with size 1000 to queue: TEST.FOO
>>     [java] Using non-persistent messages
>>     [java] Sleeping between publish 0 ms
>>     [java] Running 1 parallel threads
>>     [java] [Thread-1] Sending message: 'Message with priority: 0 index: 0
>> sent at: Mon Nov...' with priority 0
>>     [java] [Thread-1] Sending message: 'Message with priority: 1 index: 1
>> sent at: Mon Nov...' with priority 1
>>     [java] [Thread-1] Sending message: 'Message with priority: 2 index: 2
>> sent at: Mon Nov...' with priority 2
>>     [java] [Thread-1] Sending message: 'Message with priority: 3 index: 3
>> sent at: Mon Nov...' with priority 3
>>     [java] [Thread-1] Sending message: 'Message with priority: 4 index: 4
>> sent at: Mon Nov...' with priority 4
>>     [java] [Thread-1] Sending message: 'Message with priority: 5 index: 5
>> sent at: Mon Nov...' with priority 5
>>     [java] [Thread-1] Sending message: 'Message with priority: 6 index: 6
>> sent at: Mon Nov...' with priority 6
>>     [java] [Thread-1] Sending message: 'Message with priority: 7 index: 7
>> sent at: Mon Nov...' with priority 7
>>     [java] [Thread-1] Sending message: 'Message with priority: 8 index: 8
>> sent at: Mon Nov...' with priority 8
>>     [java] [Thread-1] Sending message: 'Message with priority: 0 index: 9
>> sent at: Mon Nov...' with priority 0
>>     [java] [Thread-1] Sending message: 'Message with priority: 1 index: 10
>> sent at: Mon No...' with priority 1
>>     [java] [Thread-1] Sending message: 'Message with priority: 2 index: 11
>> sent at: Mon No...' with priority 2
>>     [java] [Thread-1] Sending message: 'Message with priority: 3 index: 12
>> sent at: Mon No...' with priority 3
>>     [java] [Thread-1] Sending message: 'Message with priority: 4 index: 13
>> sent at: Mon No...' with priority 4
>>     [java] [Thread-1] Sending message: 'Message with priority: 5 index: 14
>> sent at: Mon No...' with priority 5
>>     [java] [Thread-1] Sending message: 'Message with priority: 6 index: 15
>> sent at: Mon No...' with priority 6
>>    ...
>>
>> mike@dell:~/tmp/apache-activemq-5.5.0/example$ ant consumer
>> Buildfile: /home/mike/tmp/apache-activemq-5.5.0/example/build.xml
>>
>> init:
>>
>> compile:
>>    [javac] /home/mike/tmp/apache-activemq-5.5.0/example/build.xml:152:
>> warning: 'includeantruntime' was not set, defaulting to
>> build.sysclasspath=last; set to false for repeatable builds
>>
>> consumer:
>>     [echo] Running consumer against server at $url = tcp://localhost:61616
>> for subject $subject = TEST.FOO
>>     [java] Connecting to URL: tcp://localhost:61616
>>     [java] Consuming queue: TEST.FOO
>>     [java] Using a non-durable subscription
>>     [java] JMS Selector
>>     [java] Running 1 parallel threads
>>     [java] [Thread-1] We are about to wait until we consume: 180 message(s)
>> then we will shutdown
>>     [java] [Thread-1] Received: 'Message with priority: 7 index: 7 sent at:
>> Mon Nov...' (length 1000)
>>     [java] [Thread-1] Received: 'Message with priority: 8 index: 8 sent at:
>> Mon Nov...' (length 1000)
>>     [java] [Thread-1] Received: 'Message with priority: 8 index: 17 sent
>> at: Mon No...' (length 1000)
>>     [java] [Thread-1] Received: 'Message with priority: 7 index: 16 sent
>> at: Mon No...' (length 1000)
>>     [java] [Thread-1] Received: 'Message with priority: 6 index: 6 sent at:
>> Mon Nov...' (length 1000)
>>     [java] [Thread-1] Received: 'Message with priority: 6 index: 15 sent
>> at: Mon No...' (length 1000)
>>     [java] [Thread-1] Received: 'Message with priority: 5 index: 5 sent at:
>> Mon Nov...' (length 1000)
>>    ....
>> ======================
>>
>> Thanks in advance,
>> Mihai
>>
>> http://activemq.2283324.n4.nabble.com/file/n4040260/example.tgz example.tgz
>>
>>
>> --
>> View this message in context: http://activemq.2283324.n4.nabble.com/Message-priority-tp4040260p4040260.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>
>
>
>-- 
>http://fusesource.com
>http://blog.garytully.com

Re: Message priority

Posted by Gary Tully <ga...@gmail.com>.
The fix will be in 5.6, have a peek at
https://issues.apache.org/jira/browse/AMQ-3596, you can validate it in
tonight's 5.6-SNAPSHOT or try one of the workarounds.

On 14 November 2011 17:33, Mihai Osian <Mi...@gmail.com> wrote:
>  Hello,
>
>  I am trying to enable priority consumption and I can't seem to get it
> working right. I am using ActiveMQ version 5.5.1 and I have followed the
> instructions from
> http://activemq.apache.org/how-can-i-support-priority-queues.html.
>  For testing I modified the example code that comes with ActiveMQ itself so
> that the ProducerTool sends 180 messages with cycling priority
> (0,1,..8,0,1,2...). After the producer finishes the job I start the consumer
> and look at the order in which the messages are received. The first few
> messages are sometimes received in an apparently random order, after which
> they start coming in descending order of their priority, as I expect.
>  I have also discovered that if I set queuePrefetch="1" in the
> "policyEntry" element from the XML configuration file I get an unsorted
> list. Similarly, setting it to 3 (for example) results in the output list
> being thoroughly randomized (messages are presumably sorted in groups of 3,
> but not quite).
>
>  My questions are:
> 1. Can somebody explain the results I see ? Am I missing something ?
> 2. Is there a way to get the all messages fully sorted in descending order,
> assuming one single consumer that is started after all messages are in the
> queue ?
> 3. The ActiveMQ documentation page (written for version 5.4) that I
> mentioned above says that KahaDB supports 3 levels of priority (<5, 5, and
>>5). However:
>   3a. The first few messages that I get are sometimes low-priority (<5).
>   3b. With a larger queuePrefetch most messages are fully sorted in
> descending order, so it may be that the page is out of date and in the
> latest version it is supposed to support the full priority range. Still, I
> don't understand the output.
>
> Other technical information:
> - ActiveMQ 5.5.1 (also tested 5.5.0, same behavior)
> - activemq started from the command line with:
>   ./bin/activemq console xbean:conf/activemq-priority.xml
> The activemq-priority.xml is a copy of the conf/activemq.xml, plus the
> following entry:
>  <policyEntry queue="TEST.FOO" prioritizedMessages="true"
> queuePrefetch="200" producerFlowControl="true"
> memoryLimit="1mb"></policyEntry>
> - A modified version of the ActiveMQ example (also containing the
> activemq-priority.xml file) can be found here:
> http://www.mediafire.com/?z7u5fhhlfbt3ano , if anybody wants to check the
> code.
>
> =========================================
> An example output copy-pasted from my console:
>
> mike@dell:~/tmp/apache-activemq-5.5.0/example$ ant producer
> Buildfile: /home/mike/tmp/apache-activemq-5.5.0/example/build.xml
>
> init:
>
> compile:
>    [javac] /home/mike/tmp/apache-activemq-5.5.0/example/build.xml:152:
> warning: 'includeantruntime' was not set, defaulting to
> build.sysclasspath=last; set to false for repeatable builds
>
> producer:
>     [echo] Running producer against server at $url = tcp://localhost:61616
> for subject $subject = TEST.FOO
>     [java] Connecting to URL: tcp://localhost:61616
>     [java] Publishing a Message with size 1000 to queue: TEST.FOO
>     [java] Using non-persistent messages
>     [java] Sleeping between publish 0 ms
>     [java] Running 1 parallel threads
>     [java] [Thread-1] Sending message: 'Message with priority: 0 index: 0
> sent at: Mon Nov...' with priority 0
>     [java] [Thread-1] Sending message: 'Message with priority: 1 index: 1
> sent at: Mon Nov...' with priority 1
>     [java] [Thread-1] Sending message: 'Message with priority: 2 index: 2
> sent at: Mon Nov...' with priority 2
>     [java] [Thread-1] Sending message: 'Message with priority: 3 index: 3
> sent at: Mon Nov...' with priority 3
>     [java] [Thread-1] Sending message: 'Message with priority: 4 index: 4
> sent at: Mon Nov...' with priority 4
>     [java] [Thread-1] Sending message: 'Message with priority: 5 index: 5
> sent at: Mon Nov...' with priority 5
>     [java] [Thread-1] Sending message: 'Message with priority: 6 index: 6
> sent at: Mon Nov...' with priority 6
>     [java] [Thread-1] Sending message: 'Message with priority: 7 index: 7
> sent at: Mon Nov...' with priority 7
>     [java] [Thread-1] Sending message: 'Message with priority: 8 index: 8
> sent at: Mon Nov...' with priority 8
>     [java] [Thread-1] Sending message: 'Message with priority: 0 index: 9
> sent at: Mon Nov...' with priority 0
>     [java] [Thread-1] Sending message: 'Message with priority: 1 index: 10
> sent at: Mon No...' with priority 1
>     [java] [Thread-1] Sending message: 'Message with priority: 2 index: 11
> sent at: Mon No...' with priority 2
>     [java] [Thread-1] Sending message: 'Message with priority: 3 index: 12
> sent at: Mon No...' with priority 3
>     [java] [Thread-1] Sending message: 'Message with priority: 4 index: 13
> sent at: Mon No...' with priority 4
>     [java] [Thread-1] Sending message: 'Message with priority: 5 index: 14
> sent at: Mon No...' with priority 5
>     [java] [Thread-1] Sending message: 'Message with priority: 6 index: 15
> sent at: Mon No...' with priority 6
>    ...
>
> mike@dell:~/tmp/apache-activemq-5.5.0/example$ ant consumer
> Buildfile: /home/mike/tmp/apache-activemq-5.5.0/example/build.xml
>
> init:
>
> compile:
>    [javac] /home/mike/tmp/apache-activemq-5.5.0/example/build.xml:152:
> warning: 'includeantruntime' was not set, defaulting to
> build.sysclasspath=last; set to false for repeatable builds
>
> consumer:
>     [echo] Running consumer against server at $url = tcp://localhost:61616
> for subject $subject = TEST.FOO
>     [java] Connecting to URL: tcp://localhost:61616
>     [java] Consuming queue: TEST.FOO
>     [java] Using a non-durable subscription
>     [java] JMS Selector
>     [java] Running 1 parallel threads
>     [java] [Thread-1] We are about to wait until we consume: 180 message(s)
> then we will shutdown
>     [java] [Thread-1] Received: 'Message with priority: 7 index: 7 sent at:
> Mon Nov...' (length 1000)
>     [java] [Thread-1] Received: 'Message with priority: 8 index: 8 sent at:
> Mon Nov...' (length 1000)
>     [java] [Thread-1] Received: 'Message with priority: 8 index: 17 sent
> at: Mon No...' (length 1000)
>     [java] [Thread-1] Received: 'Message with priority: 7 index: 16 sent
> at: Mon No...' (length 1000)
>     [java] [Thread-1] Received: 'Message with priority: 6 index: 6 sent at:
> Mon Nov...' (length 1000)
>     [java] [Thread-1] Received: 'Message with priority: 6 index: 15 sent
> at: Mon No...' (length 1000)
>     [java] [Thread-1] Received: 'Message with priority: 5 index: 5 sent at:
> Mon Nov...' (length 1000)
>    ....
> ======================
>
> Thanks in advance,
> Mihai
>
> http://activemq.2283324.n4.nabble.com/file/n4040260/example.tgz example.tgz
>
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/Message-priority-tp4040260p4040260.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
http://fusesource.com
http://blog.garytully.com