You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Michel Jung <mi...@gmail.com> on 2021/01/17 09:48:52 UTC

ActiveMQ: How do I limit the number of messages being dispatched?

Let's say I have one ActiveMQ Broker and an undefined numbers of consumers.

Problem:

* To process a message, consumers need an external service which is either
"DATA1" or "DATA2" (specified in the message)
* Each server, "DATA1" and "DATA2", can only handle 20 connections
* So at most 20 "DATA1" and 2 "DATA2" messages must be dispatched at any
time
* Because of priorization, the messages must be enqueued in the same queue

How can this be solved? As long as I was using message pulling (prefetch of
0), I was able to do this by using a BrokerPlugin that, on messagePull,
achieved this by using semaphores and selectors. If the limits were
reached, the pull returned null.

However, due to performance issues I had to set prefetch to 1 and use push
instead. Therefore, my messagePull hack no longer works (it's never called).

So far I'm considering implementing a custom Cursor but I was wondering if
someone knows a better solution.

I also posted this on StackOverflow
<https://stackoverflow.com/questions/65738699/activemq-how-do-i-limit-the-number-of-messages-being-dispatched>
.

Re: ActiveMQ: How do I limit the number of messages being dispatched?

Posted by Matt Pavlovich <ma...@gmail.com>.
Michel-

Bringing my SO answer over here as well— queues do this naturally, and the consumer.receive() method is blocking so this should “just work”, aside from some detail missing in your description.

What would be helpful is to understand— what behavior are you seeing that is _not_ what you expect?

----
Queues give you buffering for system processing time for free. Messages are delivered on demand. With prefetch=0 or prefetch=1, should effectively get you there. Messages will only be delivered to a consumer when the consumer is ready (ie.. during the consumer.receive() method).

consumer.receive() is a blocking call, so you should not need any custom plugin or other to delay delivery until the consumer process (and its required downstream services) are ready to handle it.

The behavior should work out-of-the-box, or there are some details to your use case that are not provided to shed more light on the scenario.

-Matt Pavlovich

> On Jan 18, 2021, at 2:39 AM, Simon Lundström <si...@su.se> wrote:
> 
> On Sun, 2021-01-17 at 10:48:52 +0100, Michel Jung wrote:
>> Let's say I have one ActiveMQ Broker and an undefined numbers of consumers.
>> 
>> Problem:
>> 
>> * To process a message, consumers need an external service which is either
>> "DATA1" or "DATA2" (specified in the message)
>> * Each server, "DATA1" and "DATA2", can only handle 20 connections
>> * So at most 20 "DATA1" and 2 "DATA2" messages must be dispatched at any
>> time
>> * Because of priorization, the messages must be enqueued in the same queue
> 
> I'm probably naive but:
> * Put the servername ("DATA{1,2}") in a header so you can use it as a   selector https://activemq.apache.org/selectors.html
> * Tell each of the consumers to use the correct selector
> * Set the consumer to have 20 worker threads/executors so that you only   have 20 connections per consumer.
> * If you need redundancy (since you can't have performance with the 20   connections I'm guessing?) add another consumer and use the exclusive   consumer https://activemq.apache.org/exclusive-consumer.html
> 
> 
> As I said, naive but I don't see why it wouldn't work.
> 
> BR,
> - Simon


Re: ActiveMQ: How do I limit the number of messages being dispatched?

Posted by Simon Lundström <si...@su.se>.
On Sun, 2021-01-17 at 10:48:52 +0100, Michel Jung wrote:
>Let's say I have one ActiveMQ Broker and an undefined numbers of consumers.
>
>Problem:
>
>* To process a message, consumers need an external service which is either
>"DATA1" or "DATA2" (specified in the message)
>* Each server, "DATA1" and "DATA2", can only handle 20 connections
>* So at most 20 "DATA1" and 2 "DATA2" messages must be dispatched at any
>time
>* Because of priorization, the messages must be enqueued in the same queue

I'm probably naive but:
* Put the servername ("DATA{1,2}") in a header so you can use it as a 
   selector https://activemq.apache.org/selectors.html
* Tell each of the consumers to use the correct selector
* Set the consumer to have 20 worker threads/executors so that you only 
   have 20 connections per consumer.
* If you need redundancy (since you can't have performance with the 20 
   connections I'm guessing?) add another consumer and use the exclusive 
   consumer https://activemq.apache.org/exclusive-consumer.html


As I said, naive but I don't see why it wouldn't work.

BR,
- Simon