You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Andrew Chandler <an...@riftware.com> on 2010/02/25 23:36:37 UTC

Split, custom threadpools, aggregationstrategy

I'm hoping someone can help me out - I'm relatively new to camel,
however I've googled and tried reading the documentation.    What I'm
trying to do ultimately is  take a message, send it through a splitter
( a custom one in the end), and send it to processors in parallel and
then use a custom aggregate.   This needs to happen in parallel and
there will be multiple instances of the route in question running.


What I've done is setup a proof of concept proto-type.    What has me
stumped is I had planned on sending in a ThreadPoolExecutor so that I
could control thread core, max and timeout values.   In Camel 2.2 this
doesn't seem to be an option in spite of what the documentation says.
As a sample here is the beginings of my process (no aggregation done
yet)


           context.addRoutes(new RouteBuilder() {

                public void configure() {

//                    ThreadPoolExecutor threadPoolExecutor =
//                            new ThreadPoolExecutor(8, 150, 30L,
TimeUnit.SECONDS, new LinkedBlockingQueue());                    



from("activemq:queue:sping-in.queue").split().method(mySplitterBean,
"splitBody")
                              .parallelProcessing().threads(150).process(new Processor() {
                                     public void process(Exchange arg0)
throws Exception {
                                         System.out.println("Exchange :"
+ arg0.toString());
                                         PingerBean pBean = new
PingerBean();

pBean.ping((String)arg0.getIn().getBody());
                                    }
                                });
                
                }
            });


When you look at http://camel.apache.org/splitter.html    and search for
"Specifying a custom ThreadPoolExecutor" you will see a java snippet
that says you should be able to do this
from("activemq:my.queue").split(xPathBuilder, true,
threadPoolExecutor).to("activemq:my.parts");     However my code
completion in the ide only admits to these choices on .split:
split()  ExpressionClause <SplitDefinition>
split(Expression)
split(Expression, AggregationStrategy)

My working example uses .threads(150) but that doesn't let me control,
min, max, and timeout values on the threadpool.    Also its unclear to
me if the .threads(150) is a TOTAL of 150 threads for ALL instances of
this route that are running at the same time, or is it a pool for just
this route?   (In which case I don't need 150 - I need more like 5)    


Any help is appreciated.



Re: Split, custom threadpools, aggregationstrategy

Posted by Claus Ibsen <cl...@gmail.com>.
On Fri, Feb 26, 2010 at 3:05 PM, Andrew Chandler <an...@riftware.com> wrote:
> Thank you very much, - do you know how I would go about l.etting the
> powers that be know the documentation needs an update for the splitter
> page?

I update the wiki page also.

> On Fri, 2010-02-26 at 06:48 +0100, Claus Ibsen wrote:
>
>> Hi
>>
>> Use the method executeService on the split DSL.
>>
>> eg
>>
>>  from("activemq:queue:sping-in.queue").split().method(mySplitterBean,
>> "splitBody")
>>
>> .parallelProcessing().executeService(threadPoolExecutor).process(new
>> Processor() {
>>
>>
>> On Thu, Feb 25, 2010 at 11:36 PM, Andrew Chandler <an...@riftware.com> wrote:
>> > I'm hoping someone can help me out - I'm relatively new to camel,
>> > however I've googled and tried reading the documentation.    What I'm
>> > trying to do ultimately is  take a message, send it through a splitter
>> > ( a custom one in the end), and send it to processors in parallel and
>> > then use a custom aggregate.   This needs to happen in parallel and
>> > there will be multiple instances of the route in question running.
>> >
>> >
>> > What I've done is setup a proof of concept proto-type.    What has me
>> > stumped is I had planned on sending in a ThreadPoolExecutor so that I
>> > could control thread core, max and timeout values.   In Camel 2.2 this
>> > doesn't seem to be an option in spite of what the documentation says.
>> > As a sample here is the beginings of my process (no aggregation done
>> > yet)
>> >
>> >
>> >           context.addRoutes(new RouteBuilder() {
>> >
>> >                public void configure() {
>> >
>> > //                    ThreadPoolExecutor threadPoolExecutor =
>> > //                            new ThreadPoolExecutor(8, 150, 30L,
>> > TimeUnit.SECONDS, new LinkedBlockingQueue());
>> >
>> >
>> >
>> > from("activemq:queue:sping-in.queue").split().method(mySplitterBean,
>> > "splitBody")
>> >                              .parallelProcessing().threads(150).process(new Processor() {
>> >                                     public void process(Exchange arg0)
>> > throws Exception {
>> >                                         System.out.println("Exchange :"
>> > + arg0.toString());
>> >                                         PingerBean pBean = new
>> > PingerBean();
>> >
>> > pBean.ping((String)arg0.getIn().getBody());
>> >                                    }
>> >                                });
>> >
>> >                }
>> >            });
>> >
>> >
>> > When you look at http://camel.apache.org/splitter.html    and search for
>> > "Specifying a custom ThreadPoolExecutor" you will see a java snippet
>> > that says you should be able to do this
>> > from("activemq:my.queue").split(xPathBuilder, true,
>> > threadPoolExecutor).to("activemq:my.parts");     However my code
>> > completion in the ide only admits to these choices on .split:
>> > split()  ExpressionClause <SplitDefinition>
>> > split(Expression)
>> > split(Expression, AggregationStrategy)
>> >
>> > My working example uses .threads(150) but that doesn't let me control,
>> > min, max, and timeout values on the threadpool.    Also its unclear to
>> > me if the .threads(150) is a TOTAL of 150 threads for ALL instances of
>> > this route that are running at the same time, or is it a pool for just
>> > this route?   (In which case I don't need 150 - I need more like 5)
>> >
>> >
>> > Any help is appreciated.
>> >
>> >
>> >
>>
>>
>>
>
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: Split, custom threadpools, aggregationstrategy

Posted by Andrew Chandler <an...@riftware.com>.
Thank you very much, - do you know how I would go about l.etting the
powers that be know the documentation needs an update for the splitter
page?  
On Fri, 2010-02-26 at 06:48 +0100, Claus Ibsen wrote:

> Hi
> 
> Use the method executeService on the split DSL.
> 
> eg
> 
>  from("activemq:queue:sping-in.queue").split().method(mySplitterBean,
> "splitBody")
> 
> .parallelProcessing().executeService(threadPoolExecutor).process(new
> Processor() {
> 
> 
> On Thu, Feb 25, 2010 at 11:36 PM, Andrew Chandler <an...@riftware.com> wrote:
> > I'm hoping someone can help me out - I'm relatively new to camel,
> > however I've googled and tried reading the documentation.    What I'm
> > trying to do ultimately is  take a message, send it through a splitter
> > ( a custom one in the end), and send it to processors in parallel and
> > then use a custom aggregate.   This needs to happen in parallel and
> > there will be multiple instances of the route in question running.
> >
> >
> > What I've done is setup a proof of concept proto-type.    What has me
> > stumped is I had planned on sending in a ThreadPoolExecutor so that I
> > could control thread core, max and timeout values.   In Camel 2.2 this
> > doesn't seem to be an option in spite of what the documentation says.
> > As a sample here is the beginings of my process (no aggregation done
> > yet)
> >
> >
> >           context.addRoutes(new RouteBuilder() {
> >
> >                public void configure() {
> >
> > //                    ThreadPoolExecutor threadPoolExecutor =
> > //                            new ThreadPoolExecutor(8, 150, 30L,
> > TimeUnit.SECONDS, new LinkedBlockingQueue());
> >
> >
> >
> > from("activemq:queue:sping-in.queue").split().method(mySplitterBean,
> > "splitBody")
> >                              .parallelProcessing().threads(150).process(new Processor() {
> >                                     public void process(Exchange arg0)
> > throws Exception {
> >                                         System.out.println("Exchange :"
> > + arg0.toString());
> >                                         PingerBean pBean = new
> > PingerBean();
> >
> > pBean.ping((String)arg0.getIn().getBody());
> >                                    }
> >                                });
> >
> >                }
> >            });
> >
> >
> > When you look at http://camel.apache.org/splitter.html    and search for
> > "Specifying a custom ThreadPoolExecutor" you will see a java snippet
> > that says you should be able to do this
> > from("activemq:my.queue").split(xPathBuilder, true,
> > threadPoolExecutor).to("activemq:my.parts");     However my code
> > completion in the ide only admits to these choices on .split:
> > split()  ExpressionClause <SplitDefinition>
> > split(Expression)
> > split(Expression, AggregationStrategy)
> >
> > My working example uses .threads(150) but that doesn't let me control,
> > min, max, and timeout values on the threadpool.    Also its unclear to
> > me if the .threads(150) is a TOTAL of 150 threads for ALL instances of
> > this route that are running at the same time, or is it a pool for just
> > this route?   (In which case I don't need 150 - I need more like 5)
> >
> >
> > Any help is appreciated.
> >
> >
> >
> 
> 
> 



Re: Split, custom threadpools, aggregationstrategy

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Use the method executeService on the split DSL.

eg

 from("activemq:queue:sping-in.queue").split().method(mySplitterBean,
"splitBody")

.parallelProcessing().executeService(threadPoolExecutor).process(new
Processor() {


On Thu, Feb 25, 2010 at 11:36 PM, Andrew Chandler <an...@riftware.com> wrote:
> I'm hoping someone can help me out - I'm relatively new to camel,
> however I've googled and tried reading the documentation.    What I'm
> trying to do ultimately is  take a message, send it through a splitter
> ( a custom one in the end), and send it to processors in parallel and
> then use a custom aggregate.   This needs to happen in parallel and
> there will be multiple instances of the route in question running.
>
>
> What I've done is setup a proof of concept proto-type.    What has me
> stumped is I had planned on sending in a ThreadPoolExecutor so that I
> could control thread core, max and timeout values.   In Camel 2.2 this
> doesn't seem to be an option in spite of what the documentation says.
> As a sample here is the beginings of my process (no aggregation done
> yet)
>
>
>           context.addRoutes(new RouteBuilder() {
>
>                public void configure() {
>
> //                    ThreadPoolExecutor threadPoolExecutor =
> //                            new ThreadPoolExecutor(8, 150, 30L,
> TimeUnit.SECONDS, new LinkedBlockingQueue());
>
>
>
> from("activemq:queue:sping-in.queue").split().method(mySplitterBean,
> "splitBody")
>                              .parallelProcessing().threads(150).process(new Processor() {
>                                     public void process(Exchange arg0)
> throws Exception {
>                                         System.out.println("Exchange :"
> + arg0.toString());
>                                         PingerBean pBean = new
> PingerBean();
>
> pBean.ping((String)arg0.getIn().getBody());
>                                    }
>                                });
>
>                }
>            });
>
>
> When you look at http://camel.apache.org/splitter.html    and search for
> "Specifying a custom ThreadPoolExecutor" you will see a java snippet
> that says you should be able to do this
> from("activemq:my.queue").split(xPathBuilder, true,
> threadPoolExecutor).to("activemq:my.parts");     However my code
> completion in the ide only admits to these choices on .split:
> split()  ExpressionClause <SplitDefinition>
> split(Expression)
> split(Expression, AggregationStrategy)
>
> My working example uses .threads(150) but that doesn't let me control,
> min, max, and timeout values on the threadpool.    Also its unclear to
> me if the .threads(150) is a TOTAL of 150 threads for ALL instances of
> this route that are running at the same time, or is it a pool for just
> this route?   (In which case I don't need 150 - I need more like 5)
>
>
> Any help is appreciated.
>
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus