You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Tim Dudgeon <td...@gmail.com> on 2012/09/11 19:51:47 UTC

SEDA and concurrentConsumers

I'm trying to use the SEDA component with the concurrentConsumers options.
I'm setting a queue size and expecting the calling thread to block, but 
its not working for me.
I have groovy code like this:


def vals = 1..10
println vals

CamelContext camelContext = new DefaultCamelContext()
camelContext.addRoutes(new RouteBuilder() {
     def void configure() {
         from('direct:start')
                 .split(body())
                 .log('direct ${body}')
                 .to('seda:insert')

from('seda:insert?concurrentConsumers=3&blockWhenFull=true&size=5')
                 .delay(100)
                 .log('seda  ${body}')
     }
})
camelContext.start()

def template = camelContext.createProducerTemplate()
println 'starting'
template.sendBody('direct:start', vals)
println 'done'



and its outputting this:

915 [main] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel 
2.10.1 (CamelContext: camel-1) started in 0.348 seconds
starting
941 [main] INFO route1 - direct 1
944 [main] INFO route1 - direct 2
944 [main] INFO route1 - direct 3
945 [main] INFO route1 - direct 4
945 [main] INFO route1 - direct 5
945 [main] INFO route1 - direct 6
946 [main] INFO route1 - direct 7
946 [main] INFO route1 - direct 8
946 [main] INFO route1 - direct 9
946 [main] INFO route1 - direct 10
done
2009 [Camel (camel-1) thread #2 - seda://insert] INFO route2 - seda 3
2009 [Camel (camel-1) thread #1 - seda://insert] INFO route2 - seda 2
2009 [Camel (camel-1) thread #0 - seda://insert] INFO route2 - seda 1
2110 [Camel (camel-1) thread #0 - seda://insert] INFO route2 - seda 6
2110 [Camel (camel-1) thread #1 - seda://insert] INFO route2 - seda 4
2110 [Camel (camel-1) thread #2 - seda://insert] INFO route2 - seda 5
2210 [Camel (camel-1) thread #0 - seda://insert] INFO route2 - seda 7
2210 [Camel (camel-1) thread #1 - seda://insert] INFO route2 - seda 8
2210 [Camel (camel-1) thread #2 - seda://insert] INFO route2 - seda 9
2310 [Camel (camel-1) thread #0 - seda://insert] INFO route2 - seda 10



Clearly the calling route is not blocking.
What am I doing wrong?
This is with 2.10.


Tim

Re: SEDA and concurrentConsumers

Posted by Claus Ibsen <cl...@gmail.com>.
On Wed, Sep 12, 2012 at 9:31 AM, Tim Dudgeon <td...@gmail.com> wrote:
> OK, thanks. So I think I get it now. Some params need defining on the 'from'
> side and some on the 'to' side. And the order the queue appears in the
> routes is important. So this seems to work for me. Is this correct?
>
>         from('seda:insert?concurrentConsumers=2&size=3')
>                 .delay(1000)
>
>                 .log('seda ${body}')
>
>         from('direct:start')
>                 .split(body())
>                 .log('direct ${body}')
>                 .to('seda:insert?blockWhenFull=true')
>
>
> For instance, if the two routes are defined in the other order its doesn't
> work as wanted.
>

Yes the size option matters, as the first endpoint that creates the
queue will control its size.
So if you want to limit then make sure to have size on the first
endpoint. Better yet on all the endpoints.

But the block option is only for the producer.

Another option is to define the endpoint, and then refer to it in the routes

configure() {

   Endpoint insert =
endpoint('seda:insert?concurrentConsumers=2&size=3&blockWhenFull=true');

   from(insert)
      ...

     ...
     to(insert)


> Tim
>
>
>
> On 12/09/2012 07:54, Claus Ibsen wrote:
>>
>> On Wed, Sep 12, 2012 at 8:52 AM, Tim Dudgeon <td...@gmail.com>
>> wrote:
>>>
>>> Great. That works. Thanks.
>>> BTW, the example in the Camel in Action book (p 322) seems wrong then as
>>> it
>>> describes it the way I originally had it.
>>>
>> The book is not wrong. The block parameter is on the *producer* side,
>> so you should have it in the "to".
>>
>>
>>> Tim
>>>
>>>
>>>
>>> On 12/09/2012 03:34, Willem jiang wrote:
>>>>
>>>> You need set the seda endpoint parameter on the first one endpoint,
>>>> because Camel will pickup the created queue in the next endpoint.
>>>>
>>>> Please change the first route like this
>>>> …
>>>>>
>>>>> .log('direct ${body}')
>>>>> .to('seda:insert?blockWhenFull=true&size=5')
>>>
>>>
>>
>>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: SEDA and concurrentConsumers

Posted by Tim Dudgeon <td...@gmail.com>.
OK, thanks. So I think I get it now. Some params need defining on the 
'from' side and some on the 'to' side. And the order the queue appears 
in the routes is important. So this seems to work for me. Is this correct?

         from('seda:insert?concurrentConsumers=2&size=3')
                 .delay(1000)
                 .log('seda ${body}')

         from('direct:start')
                 .split(body())
                 .log('direct ${body}')
                 .to('seda:insert?blockWhenFull=true')


For instance, if the two routes are defined in the other order its 
doesn't work as wanted.

Tim


On 12/09/2012 07:54, Claus Ibsen wrote:
> On Wed, Sep 12, 2012 at 8:52 AM, Tim Dudgeon <td...@gmail.com> wrote:
>> Great. That works. Thanks.
>> BTW, the example in the Camel in Action book (p 322) seems wrong then as it
>> describes it the way I originally had it.
>>
> The book is not wrong. The block parameter is on the *producer* side,
> so you should have it in the "to".
>
>
>> Tim
>>
>>
>>
>> On 12/09/2012 03:34, Willem jiang wrote:
>>> You need set the seda endpoint parameter on the first one endpoint,
>>> because Camel will pickup the created queue in the next endpoint.
>>>
>>> Please change the first route like this
>>> …
>>>> .log('direct ${body}')
>>>> .to('seda:insert?blockWhenFull=true&size=5')
>>
>
>


Re: SEDA and concurrentConsumers

Posted by Claus Ibsen <cl...@gmail.com>.
On Wed, Sep 12, 2012 at 8:52 AM, Tim Dudgeon <td...@gmail.com> wrote:
> Great. That works. Thanks.
> BTW, the example in the Camel in Action book (p 322) seems wrong then as it
> describes it the way I originally had it.
>

The book is not wrong. The block parameter is on the *producer* side,
so you should have it in the "to".


> Tim
>
>
>
> On 12/09/2012 03:34, Willem jiang wrote:
>>
>> You need set the seda endpoint parameter on the first one endpoint,
>> because Camel will pickup the created queue in the next endpoint.
>>
>> Please change the first route like this
>> …
>>>
>>> .log('direct ${body}')
>>> .to('seda:insert?blockWhenFull=true&size=5')
>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: SEDA and concurrentConsumers

Posted by Tim Dudgeon <td...@gmail.com>.
Great. That works. Thanks.
BTW, the example in the Camel in Action book (p 322) seems wrong then as 
it describes it the way I originally had it.

Tim


On 12/09/2012 03:34, Willem jiang wrote:
> You need set the seda endpoint parameter on the first one endpoint, because Camel will pickup the created queue in the next endpoint.
>
> Please change the first route like this
> …
>> .log('direct ${body}')
>> .to('seda:insert?blockWhenFull=true&size=5')


Re: SEDA and concurrentConsumers

Posted by Willem jiang <wi...@gmail.com>.
You need set the seda endpoint parameter on the first one endpoint, because Camel will pickup the created queue in the next endpoint.

Please change the first route like this
…
> .log('direct ${body}')
> .to('seda:insert?blockWhenFull=true&size=5')




--  
Willem Jiang

FuseSource
Web: http://www.fusesource.com (http://www.fusesource.com/)
Blog: http://willemjiang.blogspot.com (http://willemjiang.blogspot.com/) (English)
          http://jnn.javaeye.com (http://jnn.javaeye.com/) (Chinese)
Twitter: willemjiang  
Weibo: willemjiang





On Wednesday, September 12, 2012 at 1:51 AM, Tim Dudgeon wrote:

> I'm trying to use the SEDA component with the concurrentConsumers options.
> I'm setting a queue size and expecting the calling thread to block, but  
> its not working for me.
> I have groovy code like this:
>  
>  
> def vals = 1..10
> println vals
>  
> CamelContext camelContext = new DefaultCamelContext()
> camelContext.addRoutes(new RouteBuilder() {
> def void configure() {
> from('direct:start')
> .split(body())
> .log('direct ${body}')
> .to('seda:insert')
>  
> from('seda:insert?concurrentConsumers=3&blockWhenFull=true&size=5')
> .delay(100)
> .log('seda ${body}')
> }
> })
> camelContext.start()
>  
> def template = camelContext.createProducerTemplate()
> println 'starting'
> template.sendBody('direct:start', vals)
> println 'done'
>  
>  
>  
> and its outputting this:
>  
> 915 [main] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel  
> 2.10.1 (CamelContext: camel-1) started in 0.348 seconds
> starting
> 941 [main] INFO route1 - direct 1
> 944 [main] INFO route1 - direct 2
> 944 [main] INFO route1 - direct 3
> 945 [main] INFO route1 - direct 4
> 945 [main] INFO route1 - direct 5
> 945 [main] INFO route1 - direct 6
> 946 [main] INFO route1 - direct 7
> 946 [main] INFO route1 - direct 8
> 946 [main] INFO route1 - direct 9
> 946 [main] INFO route1 - direct 10
> done
> 2009 [Camel (camel-1) thread #2 - seda://insert] INFO route2 - seda 3
> 2009 [Camel (camel-1) thread #1 - seda://insert] INFO route2 - seda 2
> 2009 [Camel (camel-1) thread #0 - seda://insert] INFO route2 - seda 1
> 2110 [Camel (camel-1) thread #0 - seda://insert] INFO route2 - seda 6
> 2110 [Camel (camel-1) thread #1 - seda://insert] INFO route2 - seda 4
> 2110 [Camel (camel-1) thread #2 - seda://insert] INFO route2 - seda 5
> 2210 [Camel (camel-1) thread #0 - seda://insert] INFO route2 - seda 7
> 2210 [Camel (camel-1) thread #1 - seda://insert] INFO route2 - seda 8
> 2210 [Camel (camel-1) thread #2 - seda://insert] INFO route2 - seda 9
> 2310 [Camel (camel-1) thread #0 - seda://insert] INFO route2 - seda 10
>  
>  
>  
> Clearly the calling route is not blocking.
> What am I doing wrong?
> This is with 2.10.
>  
>  
> Tim