You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by nico <ni...@gmail.com> on 2012/05/04 12:02:07 UTC

Real Priority Message Consuming in ActiveMQ

Hi there,
we are using activeMQ 5.5.2 as message broker for means of job distribution
across systems and as safeguard against failures.

Now a new reqiurement rouse in the means of consuming messages in order of
their priority, so that important jobs are processed before unimportant
ones.

Looking into the documentation and the all so often recited faq about "How
can I support priority queues?" it seems quite easy to just activate
priority message consuming with the boolean prioritizedMessages and a little
configuration on the publisher side to include priorities in your jms
messages.
But it doesn't work. Not as I would expect real priority message consuming
expect to be working, by always pulling the highest prioritized message out
of the queue. And looking into several blog entries and jira bugs it evens
shows that it's not even supposed to work. Maybe on a entry buffer level of
messages being ordered when they enter the queue simultaneously, but not
when they are persistet in the queue.

So then I looked into the also stated "Alternatives" for priority message
consuming.
The Selector Approach doesn't suite our needs, cause we only have a little
number of consumers, that all have to process high and low priority jobs.
The Resequencer Approach sounded promising too, until I realized that it is
not intended to resequence messages within a queue, but just in the
transition from one queue to another. I implemented the 2 queues with
producer and consumer being connected by a BatchResequencer approach, but
that doesn't work very well.
The problem is this: say there are a 100 low priority messages and 1 high
priority message coming in in the first minute, the resequencer takes those
and writes them in the consumer queue with the high priority message being
first. Fine. But then in the seconde minute 50 more high priority messages
enter the producer queue. The Resequencer takes those messages and puts them
behind the low priorized messages in the consumer queue. So no more
priorization. One can of course play around with the timeouts and batch
sizes, but you will never reach "real priority message consuming" where a
higher priorized message is always consumed before a lower priorized
message.

So I have two issues I would like to discuss:
1. Does activeMQ support real prioritized message consuming, where a high
priority message is always consumed before a low prio message, even if they
entered the queue minutes apart?
2. If it doesn't, why does the faq "How can I support priority queues?"
state that it does? I didn't see one Remark on there like "Oh and if you
mean 'high prio before low prio always' with message priority, then sorry no
we don't support that".

btw: I know the JMS specification doesn't require priority messaging order,
or any order, but as I looked into hornetQ for example, they have a perfect
implementation of real message priority consuming. So it should be possible.

Hope this wasn't too long for anyone to answer ;o)
Thanks in advance!

Nico



--
View this message in context: http://activemq.2283324.n4.nabble.com/Real-Priority-Message-Consuming-in-ActiveMQ-tp4608262.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Real Priority Message Consuming in ActiveMQ

Posted by nico <ni...@gmail.com>.
Thanks for the quick response.
So your saying that activemq does support my use case right?
Well the Junit Test looks okay too, I just read some blog entries and jira
bugs of activemq, that suggested that prioritizedQueues will only be
supported in 6.0, but I guess those posts where outdated.

I had the prefetch down to 0 already, since we only have a small amount of
long lasting tasks promoted by the queue.
So then I guess it must be some configuration issue.
I'm using activemq 5.5.1 as a web application war on JBoss 7.1. and I
configure the queues etc. via Spring in my application.
I see the messages landing in the queue with the correct priorities set,
via the activemq web console, but my application shows me that they are
consumed in order as they arrived the queue.
Switching to hornetQ with only minimal adjustments in Spring to the
ConnectionFactory and the priorized consume works, so I guess the
production of priorized JMS messages works.

So how can I check if the priorization of a queue is really activated?
Cause it seems to make no difference at all including the policyEntry with
prioritizedMessages="true" or not.

Here's my configuration:

activemq.xml

<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:osgi="http://www.springframework.org/schema/osgi"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
  http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
  http://www.springframework.org/schema/osgi
http://www.springframework.org/schema/osgi/spring-osgi.xsd">

  <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>

  <broker brokerName="web-console" start="false" persistent="true"
useJmx="true" xmlns="http://activemq.apache.org/schema/core">

    <destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry queue=">" prioritizedMessages="true"/>
        </policyEntries>
    </policyMap>
    </destinationPolicy>

     <managementContext>
    <managementContext createConnector="false"/>
    </managementContext>

    <persistenceAdapter>
        <jdbcPersistenceAdapter dataSource="#jms-ds"/>
    </persistenceAdapter>

    <plugins>
    <simpleAuthenticationPlugin anonymousAccessAllowed="false">
        <users>
            <authenticationUser username="****" password="****"
groups="users,admins" />
        </users>
    </simpleAuthenticationPlugin>
    <authorizationPlugin>
        <map>
            <authorizationMap>
                <authorizationEntries>
                    <authorizationEntry queue=">" read="users"
write="users" admin="admins" />
                    <authorizationEntry topic=">" read="users"
write="users" admin="admins" />
                    <authorizationEntry topic="ActiveMQ.Advisory.>"
read="users" write="users" admin="users" />
                </authorizationEntries>
                <tempDestinationAuthorizationEntry>
                    <tempDestinationAuthorizationEntry read="users"
write="users" admin="admins" />
                </tempDestinationAuthorizationEntry>
            </authorizationMap>
        </map>
    </authorizationPlugin>
    </plugins>

    <transportConnectors>
      <transportConnector name="default"
uri="tcp://${activemq.transport.host}:#{${activemq.transport.port} +
${jboss.socket.binding.port-offset}}"/>
    </transportConnectors>
  </broker>

  <bean id="jms-ds" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiName" value="java:jboss/datasources/GXLE"/>
  </bean>

</beans>


And the Spring config in my application:

<!-- Pool Connection Factory ActiveMQ-->
    <bean id="jmsConnectionFactory"
class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL">
        <value>${ACTIVEMQ_TRANSPORT}</value>
        </property>
        <property name="userName" value="****" />
        <property name="password" value="****" />
        <property name="prefetchPolicy" >
            <bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
                <property name="all" value="0"></property>
            </bean>
        </property>
        <property name="messagePrioritySupported" value="true" />
    </bean>

<bean id="highlyPrioritizedPublisherTemplate"
class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsConnectionFactory" />
        <property name="sessionTransacted" value="true" />
        <property name="explicitQosEnabled" value="true" />
        <property name="priority" value="9" />
    </bean>
    <bean id="lowPrioritizedPublisherTemplate"
class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsConnectionFactory" />
        <property name="sessionTransacted" value="true" />
        <property name="explicitQosEnabled" value="true" />
        <property name="priority" value="0" />
    </bean>
<bean id="berechnungJobQueue"
class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0"
            value="gasx.grid.berechnungJobQueue" />
    </bean>


On Fri, May 4, 2012 at 12:34 PM, gtully [via ActiveMQ] <
ml-node+s2283324n4608310h13@n4.nabble.com> wrote:

> hmm. prefetch may be causing you some trouble, use prefetch=0 to
> ensure each receive asks the broker for the next highest priority
> message, otherwise a newly created high priority message could be
> backed up behind prefetched low priority ones.
>
> Note the test for priority: org.apache.activemq.store.MessagePriorityTest
>
> http://svn.apache.org/repos/asf/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
>
> If you can provide some variation of that that that demonstrates a
> problem, we can easily look into it.
>
> Try a recent fuse release[1] or the current apache snapshot[2] or the
> 5.6 release candidate[3], all should suffice
>
> [1]
> http://repo.fusesource.com/nexus/content/repositories/releases/org/apache/activemq/apache-activemq/5.5.1-fuse-04-01/
> [2]
> https://repository.apache.org/content/repositories/snapshots/org/apache/activemq/apache-activemq/5.7-SNAPSHOT/
> [3]
> https://repository.apache.org/content/repositories/orgapacheactivemq-025/org/apache/activemq/apache-activemq/5.6.0/
>
>
> On 4 May 2012 11:02, nico <[hidden email]<http://user/SendEmail.jtp?type=node&node=4608310&i=0>>
> wrote:
>
> > Hi there,
> > we are using activeMQ 5.5.2 as message broker for means of job
> distribution
> > across systems and as safeguard against failures.
> >
> > Now a new reqiurement rouse in the means of consuming messages in order
> of
> > their priority, so that important jobs are processed before unimportant
> > ones.
> >
> > Looking into the documentation and the all so often recited faq about
> "How
> > can I support priority queues?" it seems quite easy to just activate
> > priority message consuming with the boolean prioritizedMessages and a
> little
> > configuration on the publisher side to include priorities in your jms
> > messages.
> > But it doesn't work. Not as I would expect real priority message
> consuming
> > expect to be working, by always pulling the highest prioritized message
> out
> > of the queue. And looking into several blog entries and jira bugs it
> evens
> > shows that it's not even supposed to work. Maybe on a entry buffer level
> of
> > messages being ordered when they enter the queue simultaneously, but not
> > when they are persistet in the queue.
> >
> > So then I looked into the also stated "Alternatives" for priority
> message
> > consuming.
> > The Selector Approach doesn't suite our needs, cause we only have a
> little
> > number of consumers, that all have to process high and low priority
> jobs.
> > The Resequencer Approach sounded promising too, until I realized that it
> is
> > not intended to resequence messages within a queue, but just in the
> > transition from one queue to another. I implemented the 2 queues with
> > producer and consumer being connected by a BatchResequencer approach,
> but
> > that doesn't work very well.
> > The problem is this: say there are a 100 low priority messages and 1
> high
> > priority message coming in in the first minute, the resequencer takes
> those
> > and writes them in the consumer queue with the high priority message
> being
> > first. Fine. But then in the seconde minute 50 more high priority
> messages
> > enter the producer queue. The Resequencer takes those messages and puts
> them
> > behind the low priorized messages in the consumer queue. So no more
> > priorization. One can of course play around with the timeouts and batch
> > sizes, but you will never reach "real priority message consuming" where
> a
> > higher priorized message is always consumed before a lower priorized
> > message.
> >
> > So I have two issues I would like to discuss:
> > 1. Does activeMQ support real prioritized message consuming, where a
> high
> > priority message is always consumed before a low prio message, even if
> they
> > entered the queue minutes apart?
> > 2. If it doesn't, why does the faq "How can I support priority queues?"
> > state that it does? I didn't see one Remark on there like "Oh and if you
> > mean 'high prio before low prio always' with message priority, then
> sorry no
> > we don't support that".
> >
> > btw: I know the JMS specification doesn't require priority messaging
> order,
> > or any order, but as I looked into hornetQ for example, they have a
> perfect
> > implementation of real message priority consuming. So it should be
> possible.
> >
> > Hope this wasn't too long for anyone to answer ;o)
> > Thanks in advance!
> >
> > Nico
> >
> >
> >
> > --
> > View this message in context:
> http://activemq.2283324.n4.nabble.com/Real-Priority-Message-Consuming-in-ActiveMQ-tp4608262.html
> > Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>
>
> --
> http://fusesource.com
> http://blog.garytully.com
>
>
> ------------------------------
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://activemq.2283324.n4.nabble.com/Real-Priority-Message-Consuming-in-ActiveMQ-tp4608262p4608310.html
>  To unsubscribe from Real Priority Message Consuming in ActiveMQ, click
> here<http://activemq.2283324.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=4608262&code=bmljby5yb2dhc2NoQGdtYWlsLmNvbXw0NjA4MjYyfDc0Mjk4MDE3Nw==>
> .
> NAML<http://activemq.2283324.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>


--
View this message in context: http://activemq.2283324.n4.nabble.com/Real-Priority-Message-Consuming-in-ActiveMQ-tp4608262p4608418.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Real Priority Message Consuming in ActiveMQ

Posted by Gary Tully <ga...@gmail.com>.
hmm. prefetch may be causing you some trouble, use prefetch=0 to
ensure each receive asks the broker for the next highest priority
message, otherwise a newly created high priority message could be
backed up behind prefetched low priority ones.

Note the test for priority: org.apache.activemq.store.MessagePriorityTest
http://svn.apache.org/repos/asf/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java

If you can provide some variation of that that that demonstrates a
problem, we can easily look into it.

Try a recent fuse release[1] or the current apache snapshot[2] or the
5.6 release candidate[3], all should suffice

[1] http://repo.fusesource.com/nexus/content/repositories/releases/org/apache/activemq/apache-activemq/5.5.1-fuse-04-01/
[2] https://repository.apache.org/content/repositories/snapshots/org/apache/activemq/apache-activemq/5.7-SNAPSHOT/
[3] https://repository.apache.org/content/repositories/orgapacheactivemq-025/org/apache/activemq/apache-activemq/5.6.0/


On 4 May 2012 11:02, nico <ni...@gmail.com> wrote:
> Hi there,
> we are using activeMQ 5.5.2 as message broker for means of job distribution
> across systems and as safeguard against failures.
>
> Now a new reqiurement rouse in the means of consuming messages in order of
> their priority, so that important jobs are processed before unimportant
> ones.
>
> Looking into the documentation and the all so often recited faq about "How
> can I support priority queues?" it seems quite easy to just activate
> priority message consuming with the boolean prioritizedMessages and a little
> configuration on the publisher side to include priorities in your jms
> messages.
> But it doesn't work. Not as I would expect real priority message consuming
> expect to be working, by always pulling the highest prioritized message out
> of the queue. And looking into several blog entries and jira bugs it evens
> shows that it's not even supposed to work. Maybe on a entry buffer level of
> messages being ordered when they enter the queue simultaneously, but not
> when they are persistet in the queue.
>
> So then I looked into the also stated "Alternatives" for priority message
> consuming.
> The Selector Approach doesn't suite our needs, cause we only have a little
> number of consumers, that all have to process high and low priority jobs.
> The Resequencer Approach sounded promising too, until I realized that it is
> not intended to resequence messages within a queue, but just in the
> transition from one queue to another. I implemented the 2 queues with
> producer and consumer being connected by a BatchResequencer approach, but
> that doesn't work very well.
> The problem is this: say there are a 100 low priority messages and 1 high
> priority message coming in in the first minute, the resequencer takes those
> and writes them in the consumer queue with the high priority message being
> first. Fine. But then in the seconde minute 50 more high priority messages
> enter the producer queue. The Resequencer takes those messages and puts them
> behind the low priorized messages in the consumer queue. So no more
> priorization. One can of course play around with the timeouts and batch
> sizes, but you will never reach "real priority message consuming" where a
> higher priorized message is always consumed before a lower priorized
> message.
>
> So I have two issues I would like to discuss:
> 1. Does activeMQ support real prioritized message consuming, where a high
> priority message is always consumed before a low prio message, even if they
> entered the queue minutes apart?
> 2. If it doesn't, why does the faq "How can I support priority queues?"
> state that it does? I didn't see one Remark on there like "Oh and if you
> mean 'high prio before low prio always' with message priority, then sorry no
> we don't support that".
>
> btw: I know the JMS specification doesn't require priority messaging order,
> or any order, but as I looked into hornetQ for example, they have a perfect
> implementation of real message priority consuming. So it should be possible.
>
> Hope this wasn't too long for anyone to answer ;o)
> Thanks in advance!
>
> Nico
>
>
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/Real-Priority-Message-Consuming-in-ActiveMQ-tp4608262.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.



-- 
http://fusesource.com
http://blog.garytully.com