You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Ryan Gardner <ry...@gmail.com> on 2009/03/16 22:22:48 UTC

Concurrent processing of messages - How do you configure a seda consumer (or any other transport)?

I am currently using Camel 1.6.0

I have a process in my application where a small number of somewhat  
time-sensitive messages are sent out to various channels (how isn't  
really relevant - could be email, jabber, smoke signals, whatever)  
Rather than process them in a queue, I would like to send them out  
concurrently. The piece that creates the messages has a loop, and in  
the java code it calls I am injecting an endpoint, and then in a loop  
I am sending messages to that endpoint:
	
	@EndpointInject(name="fooEndpoint")
	protected ProducerTemplate fooTemplate

	....

	for(some criteria) {
		...
		 fooTemplate.sendBodyAndHeaders(fooBody, fooHeaders);
	}

This part works fine. The fooEndpoint is created in a Spring DSL:

	<camel:endpoint id="fooEndpoint" uri="seda:fooOutbound" />

A java dsl routebuilder builds the route to process these messages  
like this:

    from(endpoint("foo"))
              .choice()
                 .when 
( header("fooProperty").isEqualTo(SomeEnum.A_VALUE) )
                     .choice()
                         .when 
( header("barProperty").isEqualTo(AnotherEnum.B_VALUE))
                             .to("someEndpoint")
                         .when 
( header("barProperty").isEqualTo(AnotherEnum.C_VALUE))
                             .beanRef("someBean")
                         .when 
( header("phoneMediaType").isEqualTo(PhoneMediaType.TEXT))
                             .beanRef("someOtherBean")
                         .otherwise()
                             .to("log:foo:level=ERROR")

                 .when 
(header("blargProperty").isEqualTo(SomeEnum.D_VALUE))
                     .to("log:outbound-messages?level=INFO")
                 .otherwise()
                     .to("log:outbound-messages?level=ERROR");


I naively assumed that because I was throwing out new messages in the  
loop (sending out a bunch of new messages in the  
fooTempate.sendBodyAndHeaders) it would create a new thread to handle  
them because I was using the seda endpoint. I realized when I saw that  
they were going in a queue that was not the case.

I tried to remedy this by setting the "concurrentConsumers" property  
on the consuming endpoint like this:

from("seda:fooOutbound?concurrentConsumers=8")
	.choice()
	...

but that gave me this exception:

Caused by: org.apache.camel.ResolveEndpointFailedException: Failed to  
resolve endpoint: seda:incidentOutboundContactPending? 
concurrentConsumers=8 due to: There are 1 parameters that couldn't be  
set on the endpoint. Check the uri if the parameters are spelt  
correctly and that they are properties of the endpoint. Unknown  
parameters=[{concurrentConsumers=8}]
	at  
org 
.apache 
.camel.impl.DefaultComponent.createEndpoint(DefaultComponent.java:95)
	at  
org 
.apache 
.camel.impl.DefaultCamelContext.getEndpoint(DefaultCamelContext.java: 
331)
	... 70 more

When I it up using a thread pool, it didn't seem to be handling it  
concurrently either - but I didn't look at it all that closely. It  
sounded to me like a concurrent consumer setup was more in line with  
what I wanted.

    from(endpoint("foo")).thread(8)
              .choice()

Is there something I'm missing? Any help is appreciated.

Ryan

Re: Concurrent processing of messages - How do you configure a seda consumer (or any other transport)?

Posted by Claus Ibsen <cl...@gmail.com>.
On Mon, Mar 16, 2009 at 10:22 PM, Ryan Gardner <ry...@gmail.com> wrote:
> I am currently using Camel 1.6.0
>
> I have a process in my application where a small number of somewhat
> time-sensitive messages are sent out to various channels (how isn't really
> relevant - could be email, jabber, smoke signals, whatever) Rather than
> process them in a queue, I would like to send them out concurrently. The
> piece that creates the messages has a loop, and in the java code it calls I
> am injecting an endpoint, and then in a loop I am sending messages to that
> endpoint:
>
>        @EndpointInject(name="fooEndpoint")
>        protected ProducerTemplate fooTemplate
>
>        ....
>
>        for(some criteria) {
>                ...
>                 fooTemplate.sendBodyAndHeaders(fooBody, fooHeaders);
>        }
>
> This part works fine. The fooEndpoint is created in a Spring DSL:
>
>        <camel:endpoint id="fooEndpoint" uri="seda:fooOutbound" />
>
> A java dsl routebuilder builds the route to process these messages like
> this:
>
>   from(endpoint("foo"))
>             .choice()
>                .when( header("fooProperty").isEqualTo(SomeEnum.A_VALUE) )
>                    .choice()
>                        .when(
> header("barProperty").isEqualTo(AnotherEnum.B_VALUE))
>                            .to("someEndpoint")
>                        .when(
> header("barProperty").isEqualTo(AnotherEnum.C_VALUE))
>                            .beanRef("someBean")
>                        .when(
> header("phoneMediaType").isEqualTo(PhoneMediaType.TEXT))
>                            .beanRef("someOtherBean")
>                        .otherwise()
>                            .to("log:foo:level=ERROR")
>
>                .when(header("blargProperty").isEqualTo(SomeEnum.D_VALUE))
>                    .to("log:outbound-messages?level=INFO")
>                .otherwise()
>                    .to("log:outbound-messages?level=ERROR");
>
>
> I naively assumed that because I was throwing out new messages in the loop
> (sending out a bunch of new messages in the fooTempate.sendBodyAndHeaders)
> it would create a new thread to handle them because I was using the seda
> endpoint. I realized when I saw that they were going in a queue that was not
> the case.
>
> I tried to remedy this by setting the "concurrentConsumers" property on the
> consuming endpoint like this:
>
> from("seda:fooOutbound?concurrentConsumers=8")
>        .choice()
>        ...
>
> but that gave me this exception:
>
> Caused by: org.apache.camel.ResolveEndpointFailedException: Failed to
> resolve endpoint: seda:incidentOutboundContactPending?concurrentConsumers=8
> due to: There are 1 parameters that couldn't be set on the endpoint. Check
> the uri if the parameters are spelt correctly and that they are properties
> of the endpoint. Unknown parameters=[{concurrentConsumers=8}]
>        at
> org.apache.camel.impl.DefaultComponent.createEndpoint(DefaultComponent.java:95)
>        at
> org.apache.camel.impl.DefaultCamelContext.getEndpoint(DefaultCamelContext.java:331)
>        ... 70 more
This option is not in 1.6.0 but in the next release 1.6.1 or 2.0.


>
> When I it up using a thread pool, it didn't seem to be handling it
> concurrently either - but I didn't look at it all that closely. It sounded
> to me like a concurrent consumer setup was more in line with what I wanted.
>
>   from(endpoint("foo")).thread(8)
>             .choice()
>
> Is there something I'm missing? Any help is appreciated.
>
> Ryan
>



-- 
Claus Ibsen
Apache Camel Committer

Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/