You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Ryan Gardner <ry...@gmail.com> on 2009/03/16 22:22:48 UTC
Concurrent processing of messages - How do you configure a seda consumer (or any other transport)?
I am currently using Camel 1.6.0
I have a process in my application where a small number of somewhat
time-sensitive messages are sent out to various channels (how isn't
really relevant - could be email, jabber, smoke signals, whatever)
Rather than process them in a queue, I would like to send them out
concurrently. The piece that creates the messages has a loop, and in
the java code it calls I am injecting an endpoint, and then in a loop
I am sending messages to that endpoint:
@EndpointInject(name="fooEndpoint")
protected ProducerTemplate fooTemplate
....
for(some criteria) {
...
fooTemplate.sendBodyAndHeaders(fooBody, fooHeaders);
}
This part works fine. The fooEndpoint is created in a Spring DSL:
<camel:endpoint id="fooEndpoint" uri="seda:fooOutbound" />
A java dsl routebuilder builds the route to process these messages
like this:
from(endpoint("foo"))
.choice()
.when
( header("fooProperty").isEqualTo(SomeEnum.A_VALUE) )
.choice()
.when
( header("barProperty").isEqualTo(AnotherEnum.B_VALUE))
.to("someEndpoint")
.when
( header("barProperty").isEqualTo(AnotherEnum.C_VALUE))
.beanRef("someBean")
.when
( header("phoneMediaType").isEqualTo(PhoneMediaType.TEXT))
.beanRef("someOtherBean")
.otherwise()
.to("log:foo:level=ERROR")
.when
(header("blargProperty").isEqualTo(SomeEnum.D_VALUE))
.to("log:outbound-messages?level=INFO")
.otherwise()
.to("log:outbound-messages?level=ERROR");
I naively assumed that because I was throwing out new messages in the
loop (sending out a bunch of new messages in the
fooTempate.sendBodyAndHeaders) it would create a new thread to handle
them because I was using the seda endpoint. I realized when I saw that
they were going in a queue that was not the case.
I tried to remedy this by setting the "concurrentConsumers" property
on the consuming endpoint like this:
from("seda:fooOutbound?concurrentConsumers=8")
.choice()
...
but that gave me this exception:
Caused by: org.apache.camel.ResolveEndpointFailedException: Failed to
resolve endpoint: seda:incidentOutboundContactPending?
concurrentConsumers=8 due to: There are 1 parameters that couldn't be
set on the endpoint. Check the uri if the parameters are spelt
correctly and that they are properties of the endpoint. Unknown
parameters=[{concurrentConsumers=8}]
at
org
.apache
.camel.impl.DefaultComponent.createEndpoint(DefaultComponent.java:95)
at
org
.apache
.camel.impl.DefaultCamelContext.getEndpoint(DefaultCamelContext.java:
331)
... 70 more
When I it up using a thread pool, it didn't seem to be handling it
concurrently either - but I didn't look at it all that closely. It
sounded to me like a concurrent consumer setup was more in line with
what I wanted.
from(endpoint("foo")).thread(8)
.choice()
Is there something I'm missing? Any help is appreciated.
Ryan
Re: Concurrent processing of messages - How do you configure a seda
consumer (or any other transport)?
Posted by Claus Ibsen <cl...@gmail.com>.
On Mon, Mar 16, 2009 at 10:22 PM, Ryan Gardner <ry...@gmail.com> wrote:
> I am currently using Camel 1.6.0
>
> I have a process in my application where a small number of somewhat
> time-sensitive messages are sent out to various channels (how isn't really
> relevant - could be email, jabber, smoke signals, whatever) Rather than
> process them in a queue, I would like to send them out concurrently. The
> piece that creates the messages has a loop, and in the java code it calls I
> am injecting an endpoint, and then in a loop I am sending messages to that
> endpoint:
>
> @EndpointInject(name="fooEndpoint")
> protected ProducerTemplate fooTemplate
>
> ....
>
> for(some criteria) {
> ...
> fooTemplate.sendBodyAndHeaders(fooBody, fooHeaders);
> }
>
> This part works fine. The fooEndpoint is created in a Spring DSL:
>
> <camel:endpoint id="fooEndpoint" uri="seda:fooOutbound" />
>
> A java dsl routebuilder builds the route to process these messages like
> this:
>
> from(endpoint("foo"))
> .choice()
> .when( header("fooProperty").isEqualTo(SomeEnum.A_VALUE) )
> .choice()
> .when(
> header("barProperty").isEqualTo(AnotherEnum.B_VALUE))
> .to("someEndpoint")
> .when(
> header("barProperty").isEqualTo(AnotherEnum.C_VALUE))
> .beanRef("someBean")
> .when(
> header("phoneMediaType").isEqualTo(PhoneMediaType.TEXT))
> .beanRef("someOtherBean")
> .otherwise()
> .to("log:foo:level=ERROR")
>
> .when(header("blargProperty").isEqualTo(SomeEnum.D_VALUE))
> .to("log:outbound-messages?level=INFO")
> .otherwise()
> .to("log:outbound-messages?level=ERROR");
>
>
> I naively assumed that because I was throwing out new messages in the loop
> (sending out a bunch of new messages in the fooTempate.sendBodyAndHeaders)
> it would create a new thread to handle them because I was using the seda
> endpoint. I realized when I saw that they were going in a queue that was not
> the case.
>
> I tried to remedy this by setting the "concurrentConsumers" property on the
> consuming endpoint like this:
>
> from("seda:fooOutbound?concurrentConsumers=8")
> .choice()
> ...
>
> but that gave me this exception:
>
> Caused by: org.apache.camel.ResolveEndpointFailedException: Failed to
> resolve endpoint: seda:incidentOutboundContactPending?concurrentConsumers=8
> due to: There are 1 parameters that couldn't be set on the endpoint. Check
> the uri if the parameters are spelt correctly and that they are properties
> of the endpoint. Unknown parameters=[{concurrentConsumers=8}]
> at
> org.apache.camel.impl.DefaultComponent.createEndpoint(DefaultComponent.java:95)
> at
> org.apache.camel.impl.DefaultCamelContext.getEndpoint(DefaultCamelContext.java:331)
> ... 70 more
This option is not in 1.6.0 but in the next release 1.6.1 or 2.0.
>
> When I it up using a thread pool, it didn't seem to be handling it
> concurrently either - but I didn't look at it all that closely. It sounded
> to me like a concurrent consumer setup was more in line with what I wanted.
>
> from(endpoint("foo")).thread(8)
> .choice()
>
> Is there something I'm missing? Any help is appreciated.
>
> Ryan
>
--
Claus Ibsen
Apache Camel Committer
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/