You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Tim Dudgeon <td...@gmail.com> on 2012/09/11 19:51:47 UTC
SEDA and concurrentConsumers
I'm trying to use the SEDA component with the concurrentConsumers options.
I'm setting a queue size and expecting the calling thread to block, but
its not working for me.
I have groovy code like this:
def vals = 1..10
println vals
CamelContext camelContext = new DefaultCamelContext()
camelContext.addRoutes(new RouteBuilder() {
def void configure() {
from('direct:start')
.split(body())
.log('direct ${body}')
.to('seda:insert')
from('seda:insert?concurrentConsumers=3&blockWhenFull=true&size=5')
.delay(100)
.log('seda ${body}')
}
})
camelContext.start()
def template = camelContext.createProducerTemplate()
println 'starting'
template.sendBody('direct:start', vals)
println 'done'
and its outputting this:
915 [main] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel
2.10.1 (CamelContext: camel-1) started in 0.348 seconds
starting
941 [main] INFO route1 - direct 1
944 [main] INFO route1 - direct 2
944 [main] INFO route1 - direct 3
945 [main] INFO route1 - direct 4
945 [main] INFO route1 - direct 5
945 [main] INFO route1 - direct 6
946 [main] INFO route1 - direct 7
946 [main] INFO route1 - direct 8
946 [main] INFO route1 - direct 9
946 [main] INFO route1 - direct 10
done
2009 [Camel (camel-1) thread #2 - seda://insert] INFO route2 - seda 3
2009 [Camel (camel-1) thread #1 - seda://insert] INFO route2 - seda 2
2009 [Camel (camel-1) thread #0 - seda://insert] INFO route2 - seda 1
2110 [Camel (camel-1) thread #0 - seda://insert] INFO route2 - seda 6
2110 [Camel (camel-1) thread #1 - seda://insert] INFO route2 - seda 4
2110 [Camel (camel-1) thread #2 - seda://insert] INFO route2 - seda 5
2210 [Camel (camel-1) thread #0 - seda://insert] INFO route2 - seda 7
2210 [Camel (camel-1) thread #1 - seda://insert] INFO route2 - seda 8
2210 [Camel (camel-1) thread #2 - seda://insert] INFO route2 - seda 9
2310 [Camel (camel-1) thread #0 - seda://insert] INFO route2 - seda 10
Clearly the calling route is not blocking.
What am I doing wrong?
This is with 2.10.
Tim
Re: SEDA and concurrentConsumers
Posted by Claus Ibsen <cl...@gmail.com>.
On Wed, Sep 12, 2012 at 9:31 AM, Tim Dudgeon <td...@gmail.com> wrote:
> OK, thanks. So I think I get it now. Some params need defining on the 'from'
> side and some on the 'to' side. And the order the queue appears in the
> routes is important. So this seems to work for me. Is this correct?
>
> from('seda:insert?concurrentConsumers=2&size=3')
> .delay(1000)
>
> .log('seda ${body}')
>
> from('direct:start')
> .split(body())
> .log('direct ${body}')
> .to('seda:insert?blockWhenFull=true')
>
>
> For instance, if the two routes are defined in the other order its doesn't
> work as wanted.
>
Yes the size option matters, as the first endpoint that creates the
queue will control its size.
So if you want to limit then make sure to have size on the first
endpoint. Better yet on all the endpoints.
But the block option is only for the producer.
Another option is to define the endpoint, and then refer to it in the routes
configure() {
Endpoint insert =
endpoint('seda:insert?concurrentConsumers=2&size=3&blockWhenFull=true');
from(insert)
...
...
to(insert)
> Tim
>
>
>
> On 12/09/2012 07:54, Claus Ibsen wrote:
>>
>> On Wed, Sep 12, 2012 at 8:52 AM, Tim Dudgeon <td...@gmail.com>
>> wrote:
>>>
>>> Great. That works. Thanks.
>>> BTW, the example in the Camel in Action book (p 322) seems wrong then as
>>> it
>>> describes it the way I originally had it.
>>>
>> The book is not wrong. The block parameter is on the *producer* side,
>> so you should have it in the "to".
>>
>>
>>> Tim
>>>
>>>
>>>
>>> On 12/09/2012 03:34, Willem jiang wrote:
>>>>
>>>> You need set the seda endpoint parameter on the first one endpoint,
>>>> because Camel will pickup the created queue in the next endpoint.
>>>>
>>>> Please change the first route like this
>>>> …
>>>>>
>>>>> .log('direct ${body}')
>>>>> .to('seda:insert?blockWhenFull=true&size=5')
>>>
>>>
>>
>>
>
--
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
Re: SEDA and concurrentConsumers
Posted by Tim Dudgeon <td...@gmail.com>.
OK, thanks. So I think I get it now. Some params need defining on the
'from' side and some on the 'to' side. And the order the queue appears
in the routes is important. So this seems to work for me. Is this correct?
from('seda:insert?concurrentConsumers=2&size=3')
.delay(1000)
.log('seda ${body}')
from('direct:start')
.split(body())
.log('direct ${body}')
.to('seda:insert?blockWhenFull=true')
For instance, if the two routes are defined in the other order its
doesn't work as wanted.
Tim
On 12/09/2012 07:54, Claus Ibsen wrote:
> On Wed, Sep 12, 2012 at 8:52 AM, Tim Dudgeon <td...@gmail.com> wrote:
>> Great. That works. Thanks.
>> BTW, the example in the Camel in Action book (p 322) seems wrong then as it
>> describes it the way I originally had it.
>>
> The book is not wrong. The block parameter is on the *producer* side,
> so you should have it in the "to".
>
>
>> Tim
>>
>>
>>
>> On 12/09/2012 03:34, Willem jiang wrote:
>>> You need set the seda endpoint parameter on the first one endpoint,
>>> because Camel will pickup the created queue in the next endpoint.
>>>
>>> Please change the first route like this
>>> …
>>>> .log('direct ${body}')
>>>> .to('seda:insert?blockWhenFull=true&size=5')
>>
>
>
Re: SEDA and concurrentConsumers
Posted by Claus Ibsen <cl...@gmail.com>.
On Wed, Sep 12, 2012 at 8:52 AM, Tim Dudgeon <td...@gmail.com> wrote:
> Great. That works. Thanks.
> BTW, the example in the Camel in Action book (p 322) seems wrong then as it
> describes it the way I originally had it.
>
The book is not wrong. The block parameter is on the *producer* side,
so you should have it in the "to".
> Tim
>
>
>
> On 12/09/2012 03:34, Willem jiang wrote:
>>
>> You need set the seda endpoint parameter on the first one endpoint,
>> because Camel will pickup the created queue in the next endpoint.
>>
>> Please change the first route like this
>> …
>>>
>>> .log('direct ${body}')
>>> .to('seda:insert?blockWhenFull=true&size=5')
>
>
--
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
Re: SEDA and concurrentConsumers
Posted by Tim Dudgeon <td...@gmail.com>.
Great. That works. Thanks.
BTW, the example in the Camel in Action book (p 322) seems wrong then as
it describes it the way I originally had it.
Tim
On 12/09/2012 03:34, Willem jiang wrote:
> You need set the seda endpoint parameter on the first one endpoint, because Camel will pickup the created queue in the next endpoint.
>
> Please change the first route like this
> …
>> .log('direct ${body}')
>> .to('seda:insert?blockWhenFull=true&size=5')
Re: SEDA and concurrentConsumers
Posted by Willem jiang <wi...@gmail.com>.
You need set the seda endpoint parameter on the first one endpoint, because Camel will pickup the created queue in the next endpoint.
Please change the first route like this
…
> .log('direct ${body}')
> .to('seda:insert?blockWhenFull=true&size=5')
--
Willem Jiang
FuseSource
Web: http://www.fusesource.com (http://www.fusesource.com/)
Blog: http://willemjiang.blogspot.com (http://willemjiang.blogspot.com/) (English)
http://jnn.javaeye.com (http://jnn.javaeye.com/) (Chinese)
Twitter: willemjiang
Weibo: willemjiang
On Wednesday, September 12, 2012 at 1:51 AM, Tim Dudgeon wrote:
> I'm trying to use the SEDA component with the concurrentConsumers options.
> I'm setting a queue size and expecting the calling thread to block, but
> its not working for me.
> I have groovy code like this:
>
>
> def vals = 1..10
> println vals
>
> CamelContext camelContext = new DefaultCamelContext()
> camelContext.addRoutes(new RouteBuilder() {
> def void configure() {
> from('direct:start')
> .split(body())
> .log('direct ${body}')
> .to('seda:insert')
>
> from('seda:insert?concurrentConsumers=3&blockWhenFull=true&size=5')
> .delay(100)
> .log('seda ${body}')
> }
> })
> camelContext.start()
>
> def template = camelContext.createProducerTemplate()
> println 'starting'
> template.sendBody('direct:start', vals)
> println 'done'
>
>
>
> and its outputting this:
>
> 915 [main] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel
> 2.10.1 (CamelContext: camel-1) started in 0.348 seconds
> starting
> 941 [main] INFO route1 - direct 1
> 944 [main] INFO route1 - direct 2
> 944 [main] INFO route1 - direct 3
> 945 [main] INFO route1 - direct 4
> 945 [main] INFO route1 - direct 5
> 945 [main] INFO route1 - direct 6
> 946 [main] INFO route1 - direct 7
> 946 [main] INFO route1 - direct 8
> 946 [main] INFO route1 - direct 9
> 946 [main] INFO route1 - direct 10
> done
> 2009 [Camel (camel-1) thread #2 - seda://insert] INFO route2 - seda 3
> 2009 [Camel (camel-1) thread #1 - seda://insert] INFO route2 - seda 2
> 2009 [Camel (camel-1) thread #0 - seda://insert] INFO route2 - seda 1
> 2110 [Camel (camel-1) thread #0 - seda://insert] INFO route2 - seda 6
> 2110 [Camel (camel-1) thread #1 - seda://insert] INFO route2 - seda 4
> 2110 [Camel (camel-1) thread #2 - seda://insert] INFO route2 - seda 5
> 2210 [Camel (camel-1) thread #0 - seda://insert] INFO route2 - seda 7
> 2210 [Camel (camel-1) thread #1 - seda://insert] INFO route2 - seda 8
> 2210 [Camel (camel-1) thread #2 - seda://insert] INFO route2 - seda 9
> 2310 [Camel (camel-1) thread #0 - seda://insert] INFO route2 - seda 10
>
>
>
> Clearly the calling route is not blocking.
> What am I doing wrong?
> This is with 2.10.
>
>
> Tim