You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Aaron Daubman <da...@gmail.com> on 2012/04/08 04:14:19 UTC

Looking for advice or help on batching/queueing for a fairly simple route

Greetings,

I have the following quite-simple route:

---snip---
BeanIODataFormat dataCoreFormatTest = new
BeanIODataFormat("beanio/mappings.xml", "dataCoreFileTest");
dataCoreFormatTest.setIgnoreInvalidRecords(true); //Ignore invalid records

StringBuilder fileUri = new StringBuilder("file://");
fileUri.append(props.get("ingestDir"))
        .append("?autoCreate=false")
        .append("&include=" + props.get("filePattern"))
        .append("&sortBy=" + props.get("fileSort"))
        .append("&preMove=" + props.get("progressDir"))
        .append("&move=" + props.get("completedDir"))
        .append("&moveFailed=" + props.get("failedDir"))
        .append("&runLoggingLevel=" + props.get("ingestLogLevel"))
        .append("&startingDirectoryMustExist=true");

from(fileUri.toString())  //Read in log-files after they have been rotated,
starting with the oldest in the directory
    .log("Consuming file ${file:name}")
    .unmarshal(dataCoreFormatTest) //use BeanIO to read in the CSV and
create an array of dataCore objects, one-per-record in the log file
    .to("mybatis:batchInsertdataCore?statementType=InsertList");  //use
MyBatis to insert the dataCore objects into MySQL DB
---snip---

My issues are that:
1) there could be many files when the app starts up, and I would like to
process one at a time - I see that file2 supports maxMessagesPerPoll,
however I don't think that will work since I need to process in strict
oldest-file-first order and the note on maxMessagesPerPoll says that it
occurs BEFORE sorting, so if I use it, I am not guaranteed oldest file in
the dir each time.
- How do I pass the contents of just one file at a time to be unmarshalled
by BeanIO while ensuring strict ordering (oldest files processed first)

2) There could be many records in a single file, however I would like to
set some threshold to be created into objects for multiple reasons.
- memory management - I do not want to create an unbounded number of objects
- sorting performance - I will be adding a call to ensure time-ordering of
records after unmarshalling and before database insertion
- for database performance, so that mybatis gets passed a list of no more
than X (say, 5000) objects to write to the database at a time?

I feel like I have read about built in support that would enable things
like this, however, after reading through the EIP patterns again, the
closest seems to be applying the 'throttling' pattern, which only performs
time-based limitation, whereas I want to limit absolutely (irrespective of
time). I think I should be able to use splitter and aggregator for this
(perhaps together) but, if so, am looking for guidance as to how best to
use one or both to achieve the desired results. Finally, is there a way to
place the maxMessagesPerPoll that would do this without affecting sorting?

pseudo-code would look something like:
---pseudo-code---
from(dirOfFiles) //read one file at a time from the dir, ensuring the
oldest file in the dir is always the one being read (
  .limit(1) //can't use maxMessagesPerPoll since that will not perform
desired sorting, how to limit to a single file?
  .unmarshal(dataCoreFormatTest) //create ArrayList of objects from the
file's csv content
  .sort(body(), new MyReverseComparator()) //sort the arraylist in
time-order using custom comparator
  .limit(5000) //buffer messages, sending up to the first 5000 after the
sort on to the next processor- again, would like to use maxMessagesPerPoll
here, but not sure how
  .to("mybatis...") //finally insert the 5000 received messages into the
database
---pseudo-code---

I'm hoping these are all simple questions for a seasoned Camel user.

Some final questions:
- How do you know when a producer produces a List<T> or just a String?
- How do you know when a consumer expects a List or a String
e.g. even for file2, if there are multiple files in the directory, is a
List<String> passed to the next component where each element of the list is
one file's content, or is the content of one file passed along the route at
a time?

Thanks again!
     Aaron

Re: Looking for advice or help on batching/queueing for a fairly simple route

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

I have logged a JIRA to allow the file/ftp components to not do eager
limit. Then you can sort all the files beforehand, and then limit
afterwards.
https://issues.apache.org/jira/browse/CAMEL-5202



On Sun, Apr 15, 2012 at 10:06 AM, Claus Ibsen <cl...@gmail.com> wrote:
> On Sun, Apr 8, 2012 at 4:14 AM, Aaron Daubman <da...@gmail.com> wrote:
>> Greetings,
>>
>> I have the following quite-simple route:
>>
>> ---snip---
>> BeanIODataFormat dataCoreFormatTest = new
>> BeanIODataFormat("beanio/mappings.xml", "dataCoreFileTest");
>> dataCoreFormatTest.setIgnoreInvalidRecords(true); //Ignore invalid records
>>
>> StringBuilder fileUri = new StringBuilder("file://");
>> fileUri.append(props.get("ingestDir"))
>>        .append("?autoCreate=false")
>>        .append("&include=" + props.get("filePattern"))
>>        .append("&sortBy=" + props.get("fileSort"))
>>        .append("&preMove=" + props.get("progressDir"))
>>        .append("&move=" + props.get("completedDir"))
>>        .append("&moveFailed=" + props.get("failedDir"))
>>        .append("&runLoggingLevel=" + props.get("ingestLogLevel"))
>>        .append("&startingDirectoryMustExist=true");
>>
>> from(fileUri.toString())  //Read in log-files after they have been rotated,
>> starting with the oldest in the directory
>>    .log("Consuming file ${file:name}")
>>    .unmarshal(dataCoreFormatTest) //use BeanIO to read in the CSV and
>> create an array of dataCore objects, one-per-record in the log file
>>    .to("mybatis:batchInsertdataCore?statementType=InsertList");  //use
>> MyBatis to insert the dataCore objects into MySQL DB
>> ---snip---
>>
>> My issues are that:
>> 1) there could be many files when the app starts up, and I would like to
>> process one at a time - I see that file2 supports maxMessagesPerPoll,
>> however I don't think that will work since I need to process in strict
>> oldest-file-first order and the note on maxMessagesPerPoll says that it
>> occurs BEFORE sorting, so if I use it, I am not guaranteed oldest file in
>> the dir each time.
>
> Yeah it used to be like that in the past. But then some people did FTP
> integration
> and they had a lot of FTP files, and the FTP library take up more
> memory for each
> file handle.
>
> We could possible consider adding a new option so you can flip this
> between the two behaviors.
> You would then need to be sure that when you use sorting like this,
> then it will take up some memory; as its in-memory sorting.
>
>
> An alternative currently, is to implement a custom filter, and then do
> your manual - find the oldest file in the current dir - and then only
> return true for that matching file name.
>
>
>
>> - How do I pass the contents of just one file at a time to be unmarshalled
>> by BeanIO while ensuring strict ordering (oldest files processed first)
>>
>> 2) There could be many records in a single file, however I would like to
>> set some threshold to be created into objects for multiple reasons.
>> - memory management - I do not want to create an unbounded number of objects
>> - sorting performance - I will be adding a call to ensure time-ordering of
>> records after unmarshalling and before database insertion
>> - for database performance, so that mybatis gets passed a list of no more
>> than X (say, 5000) objects to write to the database at a time?
>>
>> I feel like I have read about built in support that would enable things
>> like this, however, after reading through the EIP patterns again, the
>> closest seems to be applying the 'throttling' pattern, which only performs
>> time-based limitation, whereas I want to limit absolutely (irrespective of
>> time). I think I should be able to use splitter and aggregator for this
>> (perhaps together) but, if so, am looking for guidance as to how best to
>> use one or both to achieve the desired results. Finally, is there a way to
>> place the maxMessagesPerPoll that would do this without affecting sorting?
>>
>> pseudo-code would look something like:
>> ---pseudo-code---
>> from(dirOfFiles) //read one file at a time from the dir, ensuring the
>> oldest file in the dir is always the one being read (
>>  .limit(1) //can't use maxMessagesPerPoll since that will not perform
>> desired sorting, how to limit to a single file?
>>  .unmarshal(dataCoreFormatTest) //create ArrayList of objects from the
>> file's csv content
>>  .sort(body(), new MyReverseComparator()) //sort the arraylist in
>> time-order using custom comparator
>>  .limit(5000) //buffer messages, sending up to the first 5000 after the
>> sort on to the next processor- again, would like to use maxMessagesPerPoll
>> here, but not sure how
>>  .to("mybatis...") //finally insert the 5000 received messages into the
>> database
>> ---pseudo-code---
>>
>> I'm hoping these are all simple questions for a seasoned Camel user.
>>
>> Some final questions:
>> - How do you know when a producer produces a List<T> or just a String?
>> - How do you know when a consumer expects a List or a String
>> e.g. even for file2, if there are multiple files in the directory, is a
>> List<String> passed to the next component where each element of the list is
>> one file's content, or is the content of one file passed along the route at
>> a time?
>>
>> Thanks again!
>>     Aaron
>
>
>
> --
> Claus Ibsen
> -----------------
> CamelOne 2012 Conference, May 15-16, 2012: http://camelone.com
> FuseSource
> Email: cibsen@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/



-- 
Claus Ibsen
-----------------
CamelOne 2012 Conference, May 15-16, 2012: http://camelone.com
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: Looking for advice or help on batching/queueing for a fairly simple route

Posted by Claus Ibsen <cl...@gmail.com>.
On Sun, Apr 8, 2012 at 4:14 AM, Aaron Daubman <da...@gmail.com> wrote:
> Greetings,
>
> I have the following quite-simple route:
>
> ---snip---
> BeanIODataFormat dataCoreFormatTest = new
> BeanIODataFormat("beanio/mappings.xml", "dataCoreFileTest");
> dataCoreFormatTest.setIgnoreInvalidRecords(true); //Ignore invalid records
>
> StringBuilder fileUri = new StringBuilder("file://");
> fileUri.append(props.get("ingestDir"))
>        .append("?autoCreate=false")
>        .append("&include=" + props.get("filePattern"))
>        .append("&sortBy=" + props.get("fileSort"))
>        .append("&preMove=" + props.get("progressDir"))
>        .append("&move=" + props.get("completedDir"))
>        .append("&moveFailed=" + props.get("failedDir"))
>        .append("&runLoggingLevel=" + props.get("ingestLogLevel"))
>        .append("&startingDirectoryMustExist=true");
>
> from(fileUri.toString())  //Read in log-files after they have been rotated,
> starting with the oldest in the directory
>    .log("Consuming file ${file:name}")
>    .unmarshal(dataCoreFormatTest) //use BeanIO to read in the CSV and
> create an array of dataCore objects, one-per-record in the log file
>    .to("mybatis:batchInsertdataCore?statementType=InsertList");  //use
> MyBatis to insert the dataCore objects into MySQL DB
> ---snip---
>
> My issues are that:
> 1) there could be many files when the app starts up, and I would like to
> process one at a time - I see that file2 supports maxMessagesPerPoll,
> however I don't think that will work since I need to process in strict
> oldest-file-first order and the note on maxMessagesPerPoll says that it
> occurs BEFORE sorting, so if I use it, I am not guaranteed oldest file in
> the dir each time.

Yeah it used to be like that in the past. But then some people did FTP
integration
and they had a lot of FTP files, and the FTP library take up more
memory for each
file handle.

We could possible consider adding a new option so you can flip this
between the two behaviors.
You would then need to be sure that when you use sorting like this,
then it will take up some memory; as its in-memory sorting.


An alternative currently, is to implement a custom filter, and then do
your manual - find the oldest file in the current dir - and then only
return true for that matching file name.



> - How do I pass the contents of just one file at a time to be unmarshalled
> by BeanIO while ensuring strict ordering (oldest files processed first)
>
> 2) There could be many records in a single file, however I would like to
> set some threshold to be created into objects for multiple reasons.
> - memory management - I do not want to create an unbounded number of objects
> - sorting performance - I will be adding a call to ensure time-ordering of
> records after unmarshalling and before database insertion
> - for database performance, so that mybatis gets passed a list of no more
> than X (say, 5000) objects to write to the database at a time?
>
> I feel like I have read about built in support that would enable things
> like this, however, after reading through the EIP patterns again, the
> closest seems to be applying the 'throttling' pattern, which only performs
> time-based limitation, whereas I want to limit absolutely (irrespective of
> time). I think I should be able to use splitter and aggregator for this
> (perhaps together) but, if so, am looking for guidance as to how best to
> use one or both to achieve the desired results. Finally, is there a way to
> place the maxMessagesPerPoll that would do this without affecting sorting?
>
> pseudo-code would look something like:
> ---pseudo-code---
> from(dirOfFiles) //read one file at a time from the dir, ensuring the
> oldest file in the dir is always the one being read (
>  .limit(1) //can't use maxMessagesPerPoll since that will not perform
> desired sorting, how to limit to a single file?
>  .unmarshal(dataCoreFormatTest) //create ArrayList of objects from the
> file's csv content
>  .sort(body(), new MyReverseComparator()) //sort the arraylist in
> time-order using custom comparator
>  .limit(5000) //buffer messages, sending up to the first 5000 after the
> sort on to the next processor- again, would like to use maxMessagesPerPoll
> here, but not sure how
>  .to("mybatis...") //finally insert the 5000 received messages into the
> database
> ---pseudo-code---
>
> I'm hoping these are all simple questions for a seasoned Camel user.
>
> Some final questions:
> - How do you know when a producer produces a List<T> or just a String?
> - How do you know when a consumer expects a List or a String
> e.g. even for file2, if there are multiple files in the directory, is a
> List<String> passed to the next component where each element of the list is
> one file's content, or is the content of one file passed along the route at
> a time?
>
> Thanks again!
>     Aaron



-- 
Claus Ibsen
-----------------
CamelOne 2012 Conference, May 15-16, 2012: http://camelone.com
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/