You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by jonathanq <jq...@abebooks.com> on 2010/06/09 01:12:10 UTC

Split an exchange, process pieces in parallel and then wait for all to complete

Hello,

I am trying to get my head around how to construct a specific route in camel
2.3.  I am creating a process that listens on a "trigger" directory for a
new file which starts the process.

Once it receives the "go" message.  It will then look at a directory and
start processing all of the files in that directory.  It is going to split
those files into smaller files and compress them.

Once it is finished processing all of the files - we need to send a single
output message saying its done.

Here is the general idea:

from("file://trigger").process(getListOfFilesInDirectoryProcessor()).split(body()).threads(10).process(getFileSplitProcessor()).log("processing
of all input files complete")

Obviously in the above example - we get a log message per input file, which
we don't want.

Another idea was to use SEDA queues and an aggregator.  This works - but it
relies on knowing how many input files there were for the batchSize() in the
aggregator.  It also introduces the batchtimeout as well.  It is not ideal -
so I am wondering if there is a better way.

from("file://trigger")
.process(getListFilesInDirectoryProcessor())
.split(body())
.to("seda:atomicFiles");

from("seda:atomicFiles")
.threads(10)
.process(getFileSplitProcessor())
.log("File thread completed")
.to("seda:aggregate");

from("seda:aggregate")
.aggregate(header("extractId"))
 .batchSize(2)  // this needs to be the same as the number of input files
 .batchTimeout(10000000)
 .log("Processing of input files complete.");

I have tried to get my head around the new async package in Camel 2.x - but
I am having trouble understanding how to get it to do a request/reply.

Is there an easier way to do what I want?  The Async documentation goes into
detail on how to use callbacks - none of which seem to be in DSL routes. 
The threads() function seems to be the DSL version...but it doesn't describe
how to do the request/reply.  The example route just splits to multiple
threads and its done.  

What I am doing seems to be a common problem - get 1 input message...split
the work into smaller chunks to be processed simultaneously and then when
all pieces are done - continue on.  So this EIP:
http://camel.apache.org/composed-message-processor.html

The difference here is that I want the same processing to occur..just
multi-threaded. Whereas that EIP is more for different processing on each
piece.

Thanks!

Jonathan
-- 
View this message in context: http://old.nabble.com/Split-an-exchange%2C-process-pieces-in-parallel-and-then-wait-for-all-to-complete-tp28824028p28824028.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Split an exchange, process pieces in parallel and then wait for all to complete

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

The Camel in Action book chapters 8 and 10 covers the Splitter EIP in
detail and how to use it with concurrency as well to split in
paralllel.

But there is also some bits of details at the Camel wiki page
http://camel.apache.org/splitter.html

The Splitter has a build in aggregator so it can be used to create the
single outgoing file you want.


On Wed, Jun 9, 2010 at 1:12 AM, jonathanq <jq...@abebooks.com> wrote:
>
> Hello,
>
> I am trying to get my head around how to construct a specific route in camel
> 2.3.  I am creating a process that listens on a "trigger" directory for a
> new file which starts the process.
>
> Once it receives the "go" message.  It will then look at a directory and
> start processing all of the files in that directory.  It is going to split
> those files into smaller files and compress them.
>
> Once it is finished processing all of the files - we need to send a single
> output message saying its done.
>
> Here is the general idea:
>
> from("file://trigger").process(getListOfFilesInDirectoryProcessor()).split(body()).threads(10).process(getFileSplitProcessor()).log("processing
> of all input files complete")
>
> Obviously in the above example - we get a log message per input file, which
> we don't want.
>
> Another idea was to use SEDA queues and an aggregator.  This works - but it
> relies on knowing how many input files there were for the batchSize() in the
> aggregator.  It also introduces the batchtimeout as well.  It is not ideal -
> so I am wondering if there is a better way.
>
> from("file://trigger")
> .process(getListFilesInDirectoryProcessor())
> .split(body())
> .to("seda:atomicFiles");
>
> from("seda:atomicFiles")
> .threads(10)
> .process(getFileSplitProcessor())
> .log("File thread completed")
> .to("seda:aggregate");
>
> from("seda:aggregate")
> .aggregate(header("extractId"))
>  .batchSize(2)  // this needs to be the same as the number of input files
>  .batchTimeout(10000000)
>  .log("Processing of input files complete.");
>
> I have tried to get my head around the new async package in Camel 2.x - but
> I am having trouble understanding how to get it to do a request/reply.
>
> Is there an easier way to do what I want?  The Async documentation goes into
> detail on how to use callbacks - none of which seem to be in DSL routes.
> The threads() function seems to be the DSL version...but it doesn't describe
> how to do the request/reply.  The example route just splits to multiple
> threads and its done.
>
> What I am doing seems to be a common problem - get 1 input message...split
> the work into smaller chunks to be processed simultaneously and then when
> all pieces are done - continue on.  So this EIP:
> http://camel.apache.org/composed-message-processor.html
>
> The difference here is that I want the same processing to occur..just
> multi-threaded. Whereas that EIP is more for different processing on each
> piece.
>
> Thanks!
>
> Jonathan
> --
> View this message in context: http://old.nabble.com/Split-an-exchange%2C-process-pieces-in-parallel-and-then-wait-for-all-to-complete-tp28824028p28824028.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

AW: Split an exchange, process pieces in parallel and then wait for all to complete

Posted by Max Ullinger <Ma...@ptv.de>.
Hi.

Check out http://camel.apache.org/splitter.html, especially the example "Split aggregate request/reply sample"
This example seems to show how, after parallel processing, a single message can be sent.

I am sure you read the documentation before, so I don't know if that helps.

Cheers,
Max


-----Ursprüngliche Nachricht-----
Von: Willem Jiang [mailto:willem.jiang@gmail.com] 
Gesendet: Mittwoch, 9. Juni 2010 05:36
An: users@camel.apache.org
Betreff: Re: Split an exchange, process pieces in parallel and then wait for all to complete

Hi,

Camel splitter supports to process the message parallely.
You can use parallelProcessing() dsl to define it

from("file://trigger").process(getListOfFilesInDirectoryProcessor()).split(body()).parallelProcessing() 
...

Willem

jonathanq wrote:
> Hello,
> 
> I am trying to get my head around how to construct a specific route in camel
> 2.3.  I am creating a process that listens on a "trigger" directory for a
> new file which starts the process.
> 
> Once it receives the "go" message.  It will then look at a directory and
> start processing all of the files in that directory.  It is going to split
> those files into smaller files and compress them.
> 
> Once it is finished processing all of the files - we need to send a single
> output message saying its done.
> 
> Here is the general idea:
> 
> from("file://trigger").process(getListOfFilesInDirectoryProcessor()).split(body()).threads(10).process(getFileSplitProcessor()).log("processing
> of all input files complete")
> 
> Obviously in the above example - we get a log message per input file, which
> we don't want.
> 
> Another idea was to use SEDA queues and an aggregator.  This works - but it
> relies on knowing how many input files there were for the batchSize() in the
> aggregator.  It also introduces the batchtimeout as well.  It is not ideal -
> so I am wondering if there is a better way.
> 
> from("file://trigger")
> .process(getListFilesInDirectoryProcessor())
> .split(body())
> .to("seda:atomicFiles");
> 
> from("seda:atomicFiles")
> .threads(10)
> .process(getFileSplitProcessor())
> .log("File thread completed")
> .to("seda:aggregate");
> 
> from("seda:aggregate")
> .aggregate(header("extractId"))
>  .batchSize(2)  // this needs to be the same as the number of input files
>  .batchTimeout(10000000)
>  .log("Processing of input files complete.");
> 
> I have tried to get my head around the new async package in Camel 2.x - but
> I am having trouble understanding how to get it to do a request/reply.
> 
> Is there an easier way to do what I want?  The Async documentation goes into
> detail on how to use callbacks - none of which seem to be in DSL routes. 
> The threads() function seems to be the DSL version...but it doesn't describe
> how to do the request/reply.  The example route just splits to multiple
> threads and its done.  
> 
> What I am doing seems to be a common problem - get 1 input message...split
> the work into smaller chunks to be processed simultaneously and then when
> all pieces are done - continue on.  So this EIP:
> http://camel.apache.org/composed-message-processor.html
> 
> The difference here is that I want the same processing to occur..just
> multi-threaded. Whereas that EIP is more for different processing on each
> piece.
> 
> Thanks!
> 
> Jonathan


Re: Split an exchange, process pieces in parallel and then wait for all to complete

Posted by Willem Jiang <wi...@gmail.com>.
Hi,

Camel splitter supports to process the message parallely.
You can use parallelProcessing() dsl to define it

from("file://trigger").process(getListOfFilesInDirectoryProcessor()).split(body()).parallelProcessing() 
...

Willem

jonathanq wrote:
> Hello,
> 
> I am trying to get my head around how to construct a specific route in camel
> 2.3.  I am creating a process that listens on a "trigger" directory for a
> new file which starts the process.
> 
> Once it receives the "go" message.  It will then look at a directory and
> start processing all of the files in that directory.  It is going to split
> those files into smaller files and compress them.
> 
> Once it is finished processing all of the files - we need to send a single
> output message saying its done.
> 
> Here is the general idea:
> 
> from("file://trigger").process(getListOfFilesInDirectoryProcessor()).split(body()).threads(10).process(getFileSplitProcessor()).log("processing
> of all input files complete")
> 
> Obviously in the above example - we get a log message per input file, which
> we don't want.
> 
> Another idea was to use SEDA queues and an aggregator.  This works - but it
> relies on knowing how many input files there were for the batchSize() in the
> aggregator.  It also introduces the batchtimeout as well.  It is not ideal -
> so I am wondering if there is a better way.
> 
> from("file://trigger")
> .process(getListFilesInDirectoryProcessor())
> .split(body())
> .to("seda:atomicFiles");
> 
> from("seda:atomicFiles")
> .threads(10)
> .process(getFileSplitProcessor())
> .log("File thread completed")
> .to("seda:aggregate");
> 
> from("seda:aggregate")
> .aggregate(header("extractId"))
>  .batchSize(2)  // this needs to be the same as the number of input files
>  .batchTimeout(10000000)
>  .log("Processing of input files complete.");
> 
> I have tried to get my head around the new async package in Camel 2.x - but
> I am having trouble understanding how to get it to do a request/reply.
> 
> Is there an easier way to do what I want?  The Async documentation goes into
> detail on how to use callbacks - none of which seem to be in DSL routes. 
> The threads() function seems to be the DSL version...but it doesn't describe
> how to do the request/reply.  The example route just splits to multiple
> threads and its done.  
> 
> What I am doing seems to be a common problem - get 1 input message...split
> the work into smaller chunks to be processed simultaneously and then when
> all pieces are done - continue on.  So this EIP:
> http://camel.apache.org/composed-message-processor.html
> 
> The difference here is that I want the same processing to occur..just
> multi-threaded. Whereas that EIP is more for different processing on each
> piece.
> 
> Thanks!
> 
> Jonathan


Re: Split an exchange, process pieces in parallel and then wait for all to complete

Posted by jonathanq <jq...@abebooks.com>.
Thank you for the replies - that is exactly what I wanted.  I hadn't dug
enough through the Split() documentation to see the parallel processing
part.

I added the .executorService() part so I could cap the number of threads. 
Thank you!

Jonathan
-- 
View this message in context: http://old.nabble.com/Split-an-exchange%2C-process-pieces-in-parallel-and-then-wait-for-all-to-complete-tp28824028p28846523.html
Sent from the Camel - Users mailing list archive at Nabble.com.