You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Claus Ibsen <cl...@gmail.com> on 2012/02/01 10:15:10 UTC

Re: Using SEDA without losing messages

On Mon, Jan 30, 2012 at 1:00 PM, Sabin Timalsena
<st...@veriskhealth.com> wrote:
> Hi, thanks a lot! That definitely was very helpful.
>
> However, if I use the concurrentConsumers option, I guess it means that I'll
> have separate JMS consumers with their own separate prefetch buffers.
> Would it be possible to control the options like prefetch buffers, consumer
> priority etc (the ActiveMQ destination options)? I read that destination
> options would be available in camel URIs from ActiveMQ 5.6 onwards. However,
> what will be the case when specifying, say concurrentConsumers=5. Will this
> create 5 identical JMS consumers, or do some sort of multiplexing?
>

Good questions.

There will only be one physical connection.

However each concurrent consumer will appear as an individual
subscriber of the queue.
So for example having concurrentConsumers=5, will have 5 subscribers
of the queue.
And each subscriber has its own prefetch buffer, etc.

However AMQ tries to distribute the messages evenly between the
subscribers (if one is not slow etc.)
So if you run a test and each subscriber takes the same amount of time
to process the message, it will
look as if they do round robin processing. However if one consumer
gets slow, the others will be faster than it etc.

You can see stats and whatnot from jconsole, as AMQ publishes many
stats about itself in the JMX.
Each subscriber has a enqueue/dequed counter you can see how many
messages it have processed.




> -----Original Message-----
> From: Claus Ibsen [mailto:claus.ibsen@gmail.com]
> Sent: Monday, January 30, 2012 5:19 PM
> To: users@camel.apache.org
> Subject: Re: Using SEDA without losing messages
>
> Hi
>
> For parallel processing, use the concurrentConsumers |
> maxConcurrentConsumers on the JMS endpoint. That is better.
>
>
> On Mon, Jan 30, 2012 at 12:06 PM, Sabin Timalsena
> <st...@veriskhealth.com> wrote:
>> Well, I wanted to include SEDA for the parallel processing of messages.
>> Maybe I should be trying something else? Like the Threads DSL maybe. I
>> haven't taken a good look at the Threads DSL though. What would you
>> recommend? My scenario basically is like this:
>>
>> - Messages arrive in an ActiveMQ queue
>> - There are several consumers on that queue with different consumer
>> priorities, prefetch buffers etc.
>> - Messages received be each consumer should be processed parallelly
>> - Messages should not be dequeued unless they can be serviced, i.e. if
>> one consumer fails to process the message, the others should get that
>> chance
>>
>> I'd gladly appreciate any suggestions.
>>
>>
>> -----Original Message-----
>> From: Claus Ibsen [mailto:claus.ibsen@gmail.com]
>> Sent: Monday, January 30, 2012 3:42 PM
>> To: users@camel.apache.org
>> Subject: Re: Using SEDA without losing messages
>>
>> On Mon, Jan 30, 2012 at 10:48 AM, Sabin Timalsena
>> <st...@veriskhealth.com> wrote:
>>> Hi
>>>
>>> Yes, I tried setting blockWhenFull=true. However it seems that the
>>> caller blocks AFTER de-queuing the message from the activeMQ queue.
>>> So, if the size of the seda queue is 4, then I'm having 5 messages
>>> being dequeued, the fifth one probably waiting to get into the seda
>>> queue, and might have to wait a long time. In my situation, I have
>>> multiple distributed activeMQ consumers, each one feeding messages to
>>> a
>> SEDA queue for asynchronous processing.
>>> Therefore, if the message is dequeued by a full SEDA queue, it will
>>> have to wait on the full queue and can't be handled by other activeMQ
>> consumers.
>>>
>>
>> Yes of course it does, the JMS consumer can pickup the message
>> regardless, what the SEDA queue size is.
>>
>> You would need to use something a like the throttling route policy, to
>> be able at runtime to suspend/resume the route.
>> So it only is active if the SEDA queue is not full.
>>
>> Then you can code some custom logic in your route policy, that checks
>> if the SEDA queue is full or not. If its full then it suspend the route.
>> And when the SEDA queue is no longer full, it resumes the route.
>>
>> http://camel.apache.org/routepolicy
>>>
>>> That said, why do you need the SEDA queues at all?
>>> If you just process the message straight, then you only pickup new
>> messages from the JMS queue, when the previous message has been fully
>> processed.
>>>
>>
>>> I hope I have described my problem well. Please tell me if I'm not
>>> clear enough.
>>>
>>> Thanks
>>>
>>> -----Original Message-----
>>> From: Claus Ibsen [mailto:claus.ibsen@gmail.com]
>>> Sent: Monday, January 30, 2012 3:03 PM
>>> To: users@camel.apache.org
>>> Subject: Re: Using SEDA without losing messages
>>>
>>> Hi
>>>
>>> Check the seda documentation.
>>> http://camel.apache.org/seda
>>>
>>> There is a blockWhenFull option you can use to set true to have the
>>> caller block if the seda queue is full
>>>
>>>
>>>
>>> On Mon, Jan 30, 2012 at 7:20 AM, Sabin Timalsena
>>> <st...@veriskhealth.com> wrote:
>>>> Hello,
>>>>
>>>>
>>>>
>>>> I'm a beginner in camel and ActiveMQ and was recently trying to
>>>> study the behavior of SEDA queues.
>>>>
>>>> I'm not sure I understand the "size" property of SEDA queues. Say
>>>> the queue has (size=4) and (concurrentConsumers=4). 4 messages are
>>>> brought into the queue. My understanding is that, as soon as the
>>>> processing of those 4 messages is started, 4 more are brought into
>>>> the
>> SEDA queue.
>>>> So my assumption was that 8 messages would be dequeued initially
>>>> from the place this SEDA queue is consuming from. However, in my
>>>> tests, 9 messages were dequeued.
>>>>
>>>>
>>>>
>>>> Here's the setup I used for testing this behavior:
>>>>
>>>>
>>>>
>>>>                                private static final String SEDA_URI
>>>> = "seda:tasks?size=4&concurrentConsumers=4&blockWhenFull=true";
>>>>
>>>>                                ...
>>>>
>>>>
>>>>
>>>>                                from("activemq:start1")
>>>>
>>>>                                .wireTap("direct:wiretap")
>>>>
>>>>                                .to(SEDA_URI);
>>>>
>>>>
>>>>
>>>>                                from(SEDA_URI)
>>>>
>>>>                                .process(new Processor() {
>>>>
>>>>
>>>>
>>>>                                                @Override
>>>>
>>>>                                                public void
>>>> process(Exchange
>>>> ex) throws Exception {
>>>>
>>>>
>>>> Message in = ex.getIn();
>>>>
>>>>
>>>> LOGGER.info("Procesing Message: " + in.getBody());
>>>>
>>>>
>>>>
>>>> Thread.sleep(10000);
>>>>
>>>>                                                }
>>>>
>>>>                                });
>>>>
>>>>
>>>>
>>>>                                from("direct:wiretap")
>>>>
>>>>                                .process(new Processor() {
>>>>
>>>>
>>>>
>>>>                                                @Override
>>>>
>>>>                                                public void
>>>> process(Exchange
>>>> exchange) throws Exception {
>>>>
>>>>
>>>> Message in = exchange.getIn();
>>>>
>>>>
>>>> LOGGER.info("Tapped Message: " + in.getBody());
>>>>
>>>>                                                }
>>>>
>>>>                                });
>>>>
>>>>
>>>>
>>>> The "activemq:start1" has 20 messages initially.
>>>>
>>>> Here's the output obtained just after the test is started:
>>>>
>>>>
>>>>
>>>> [ient) thread #5 - seda://tasks] SEDATests                      INFO
>>>> Procesing Message: Message 0
>>>>
>>>> [ient) thread #2 - seda://tasks] SEDATests                      INFO
>>>> Procesing Message: Message 1
>>>>
>>>> [ient) thread #3 - seda://tasks] SEDATests                      INFO
>>>> Procesing Message: Message 2
>>>>
>>>> [ient) thread #4 - seda://tasks] SEDATests                      INFO
>>>> Procesing Message: Message 3
>>>>
>>>> [el-client) thread #6 - WireTap] SEDATests                      INFO
>>>> Tapped
>>>> Message: Message 0
>>>>
>>>> [el-client) thread #7 - WireTap] SEDATests                      INFO
>>>> Tapped
>>>> Message: Message 1
>>>>
>>>> [el-client) thread #8 - WireTap] SEDATests                      INFO
>>>> Tapped
>>>> Message: Message 2
>>>>
>>>> [el-client) thread #9 - WireTap] SEDATests                      INFO
>>>> Tapped
>>>> Message: Message 3
>>>>
>>>> [l-client) thread #10 - WireTap] SEDATests                      INFO
>>>> Tapped
>>>> Message: Message 4
>>>>
>>>> [l-client) thread #11 - WireTap] SEDATests                      INFO
>>>> Tapped
>>>> Message: Message 5
>>>>
>>>> [l-client) thread #12 - WireTap] SEDATests                      INFO
>>>> Tapped
>>>> Message: Message 6
>>>>
>>>> [l-client) thread #13 - WireTap] SEDATests                      INFO
>>>> Tapped
>>>> Message: Message 7
>>>>
>>>> [l-client) thread #14 - WireTap] SEDATests                      INFO
>>>> Tapped
>>>> Message: Message 8
>>>>
>>>>
>>>>
>>>> When I change the SEDA endpoint URI to have "size=1", 6 messages are
>>>> dequeued initially. I don't understand why that extra one message is
>>>> being dequeued. When full, does SEDA block *after* dequeuing the
> message?
>>>>
>>>>
>>>>
>>>> My objective is to prevent messages from being dequeued from
>>>> "activemq:start1", if they won't be processed immediately.
>>>>
>>>> Please give me some suggestions
>>>>
>>>>
>>>>
>>>> Thanks in advance...
>>>>
>>>
>>>
>>>
>>> --
>>> Claus Ibsen
>>> -----------------
>>> FuseSource
>>> Email: cibsen@fusesource.com
>>> Web: http://fusesource.com
>>> Twitter: davsclaus, fusenews
>>> Blog: http://davsclaus.blogspot.com/
>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>>
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> FuseSource
>> Email: cibsen@fusesource.com
>> Web: http://fusesource.com
>> Twitter: davsclaus, fusenews
>> Blog: http://davsclaus.blogspot.com/
>> Author of Camel in Action: http://www.manning.com/ibsen/
>>
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cibsen@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/