You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Claus Ibsen <cl...@gmail.com> on 2009/12/30 13:02:09 UTC

Re: Processing 5 files at a time - Threads? SEDA + Concurrent Consumers?

Hi

See also route policy to throttle the file consumer to a pace of 5
concurrent files
http://camel.apache.org/routepolicy.html



On Wed, Dec 30, 2009 at 11:51 AM, gmagniez <ga...@steria.com> wrote:
>
>
> jonathanq wrote:
>>
>> I am trying to write a process that will use a file endpoint (camel 2.1.0)
>> to read from a directory.
>>
>> I need the process to read a file from the directory and then do some
>> processing on the contents (namely hitting a REST service for each record
>> in the file).  We have been asked to limit the number of threads that are
>> hitting the service to 5.  So we decided to simply process 5 files at a
>> time (to avoid concurrency issues reading 1 file and writing to 1 file
>> with 5 threads)
>>
>> I tried a few different approaches, and I wanted to see if there was a way
>> to do what I want.
>>
>> Approach 1:
>>
>> from("file://incoming").to("seda:filequeue")
>>
>> from("seda:filequeue").thread(5).process()
>>
>> Now - this reads in ALL of the files in the directory (places camelLock on
>> all) and then sends them to the seda endpoint.  I saw log messages that
>> referred to thread 1 through 6.  But from what I read on the
>> documentation, thread() is not necessarily going t limit it at that
>> number.
>>

thread(5) will limit to at most 5 concurrent threads from this point forward.


>> Approach 2:
>>
>> from("file://incoming").thread(5).process()
>>
>> This only processed 5 files at a time - but created camelLocks on all
>> files in the directory.
>>
>> So then I tried approach 3:
>>
>> from("file://incoming").to("seda:filequeue")
>>
>> from("seda:filequeue?concurrentConsumers=5").process()
>>
>> Again this seems to work, however it puts a camelLock on all the files
>> (because they were all processed by the first part of the route, they are
>> just queued up in the second).
>>
>>
>> While approach 3 works - what I would really like is to not have the
>> camelLock placed on the files that are not being processed.
>>
>> So watching the directory, there would be (at most) 5 files with camelLock
>> files created at a time, when they finish they are moved to the .camel
>> directory, and then it starts processing the next file in the directory.
>>

You can also implement your own ProcessStrategy where you can deny
consuming in more files than 5 at any given time.
See the processStrategy option on the file consumer. Just return false
on the begin() method.

See
http://camel.apache.org/file2.html
in the bottom of the page.


>> Is that possible?  Is there anything I should be sure to do in an error
>> route so that I "roll back" the camel locks to ensure that unprocessed
>> files are ready to process the next time the application starts?
>>
>
> Hi,
>
> Maybe you can try to use the parameter maxMessagesPerPoll on the file
> endpoint i.e.:
> from("file://incoming?maxMessagesPerPoll=5").thread(5).process()
>
> Check the file component documentation : http://camel.apache.org/file2.html
>
> --
> View this message in context: http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: Processing 5 files at a time - Threads? SEDA + Concurrent Consumers?

Posted by Claus Ibsen <cl...@gmail.com>.
On Thu, Dec 31, 2009 at 12:33 AM, jonathanq <jq...@abebooks.com> wrote:
>
> I took a good look at the Route Policy - at first the
> ThrottlingInflightRoutePolicy class seemed like it could work - as I really
> only want 5 exchanges to be in-flight at a time.
>
> Unfortunately it would never suspend the consumer.  I dug deeper into the
> code and discovered why.  The ThrottlingInflightRoutePolicy class only
> checks the number of inflight exchanges AFTER an exchange has been processed
> (the code to stop or start a consumer is all done in the onExchangeDone
> method).
>
> Since in my case the exchanges will take a while to process - it wouldn't
> know it had exceeded the maximum number until after it had finished
> processing one of them.
>
> In my opinion that is a bug - or at the very least an important thing to
> note in the documentation.  I spent a fair bit of time trying to figure out
> why I could not get it to work as it appeared it was supposed to.  All
> because it was not checking the inflight numbers to the threshold until
> after it had finished processing an exchange.
>
> I also tried writing my own FileThrottlingRoutePolicy that would test how
> many files were in a "inprogress" directory - and stop the consumer if it
> exceeded the max concurrent files.
>
> However I ran into read/write issues when I used the preMove of files - for
> some reason my processes later would throw exceptions about file not found
> or file lock (I can't remember which - i have been trying so many different
> things today to try and get this working).
>
> In the end I solved my problem by avoiding my problem :)
>
> The primary reason I didn't want the file locks to occur is it would be a
> manual cleanup if we ever had to kill the process while it's running.
> Otherwise the next time it started, it would ignore any of the files that
> had a lock file as well.
>
> I re-wrote my route to work as follows:
>
> from("file://incoming?maxMessagesPerPoll=1&idempotent=true&moveFailed=failed&move=processed&readLock=none").threads(5).process()
>
> This way - when files are "finished" they will be placed in a "processed"
> directory, when they fail they are put in a "failed" directory.  Anything
> still in the incoming directory is to be processed.  Because the memory of
> what was processed and what hasn't been was all in memory - restarting the
> process will just re-start any of the files still in the incoming directory.
>
> No more Lock files means restarting it won't cause us to have to delete
> .lock files.
>
> I wish there was still an easier way to do what I wanted.  Now I just have
> to rely on the threads(5) to do the limiting to 5 files at a time.  Although
> if I understand your comment (and the documentation) I can't actually rely
> on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads
> depending on the system load?
>

threads(5) uses the JDK Executors which handles the thread pool. It
will use 5 threads if there is the load for that, if you are only
processing 1-2 files concurrently then the thread pool may be only
having 2 threads active etc. Rest assured if there is the load for 5
concurrent files then the thread pool will use 5 active threads.

> Jonathan
>
>
>
>
> Claus Ibsen-2 wrote:
>>
>> Hi
>>
>> See also route policy to throttle the file consumer to a pace of 5
>> concurrent files
>> http://camel.apache.org/routepolicy.html
>>
>>
>>
>> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez <ga...@steria.com>
>> wrote:
>>>
>>>
>>> jonathanq wrote:
>>>>
>>>> I am trying to write a process that will use a file endpoint (camel
>>>> 2.1.0)
>>>> to read from a directory.
>>>>
>>>> I need the process to read a file from the directory and then do some
>>>> processing on the contents (namely hitting a REST service for each
>>>> record
>>>> in the file).  We have been asked to limit the number of threads that
>>>> are
>>>> hitting the service to 5.  So we decided to simply process 5 files at a
>>>> time (to avoid concurrency issues reading 1 file and writing to 1 file
>>>> with 5 threads)
>>>>
>>>> I tried a few different approaches, and I wanted to see if there was a
>>>> way
>>>> to do what I want.
>>>>
>>>> Approach 1:
>>>>
>>>> from("file://incoming").to("seda:filequeue")
>>>>
>>>> from("seda:filequeue").thread(5).process()
>>>>
>>>> Now - this reads in ALL of the files in the directory (places camelLock
>>>> on
>>>> all) and then sends them to the seda endpoint.  I saw log messages that
>>>> referred to thread 1 through 6.  But from what I read on the
>>>> documentation, thread() is not necessarily going t limit it at that
>>>> number.
>>>>
>>
>> thread(5) will limit to at most 5 concurrent threads from this point
>> forward.
>>
>>
>>>> Approach 2:
>>>>
>>>> from("file://incoming").thread(5).process()
>>>>
>>>> This only processed 5 files at a time - but created camelLocks on all
>>>> files in the directory.
>>>>
>>>> So then I tried approach 3:
>>>>
>>>> from("file://incoming").to("seda:filequeue")
>>>>
>>>> from("seda:filequeue?concurrentConsumers=5").process()
>>>>
>>>> Again this seems to work, however it puts a camelLock on all the files
>>>> (because they were all processed by the first part of the route, they
>>>> are
>>>> just queued up in the second).
>>>>
>>>>
>>>> While approach 3 works - what I would really like is to not have the
>>>> camelLock placed on the files that are not being processed.
>>>>
>>>> So watching the directory, there would be (at most) 5 files with
>>>> camelLock
>>>> files created at a time, when they finish they are moved to the .camel
>>>> directory, and then it starts processing the next file in the directory.
>>>>
>>
>> You can also implement your own ProcessStrategy where you can deny
>> consuming in more files than 5 at any given time.
>> See the processStrategy option on the file consumer. Just return false
>> on the begin() method.
>>
>> See
>> http://camel.apache.org/file2.html
>> in the bottom of the page.
>>
>>
>>>> Is that possible?  Is there anything I should be sure to do in an error
>>>> route so that I "roll back" the camel locks to ensure that unprocessed
>>>> files are ready to process the next time the application starts?
>>>>
>>>
>>> Hi,
>>>
>>> Maybe you can try to use the parameter maxMessagesPerPoll on the file
>>> endpoint i.e.:
>>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process()
>>>
>>> Check the file component documentation :
>>> http://camel.apache.org/file2.html
>>>
>>> --
>>> View this message in context:
>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html
>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>
>>>
>>
>>
>>
>> --
>> Claus Ibsen
>> Apache Camel Committer
>>
>> Author of Camel in Action: http://www.manning.com/ibsen/
>> Open Source Integration: http://fusesource.com
>> Blog: http://davsclaus.blogspot.com/
>> Twitter: http://twitter.com/davsclaus
>>
>>
>
> --
> View this message in context: http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: Processing 5 files at a time - Threads? SEDA + Concurrent Consumers?

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

On Thu, Dec 31, 2009 at 8:59 PM, jonathanq <jq...@abebooks.com> wrote:
>
> Sorry - I meant - this won't cause me too much trouble since I could just
> change the code type to be File not GenericFile.  Just thought it was
> strange.
>
> I actually changed it to do a .convertBodyTo(String.class) in my route -
> since I was just reading the file and that was why I needed a GenericFile
> object to get the name to read.  Not sure why I didn't think of doing that
> earlier.
>

GenericFile should really be an internal representation only. End
users is encouraged to use regular types such as java.io.File or
String as you did.



> Jonathan
>
> jonathanq wrote:
>>
>> Claus,
>>
>> I figured out why my custom Route Policy wasn't working for me.
>>
>> It seems that there is an issue when I use preMove on the File endpoint -
>> when the Exchange reaches my first Processor, it is no longer a
>> "GenericFile" class.
>>
>> My processor does this on the first line:
>>
>> GenericFile file = exchange.getIn().getBody(GenericFile.class);
>>
>> This works fine with my route - but as soon as I use preMove and put the
>> in progress files into a "processing" directory.  The type of the body is
>> no longer GenericFile, but is now a java.io.File.  So of course, file is
>> now null.
>>

>> This isn't an issue from my code, I can change the type and everything
>> will work.  But it seems very strange that it changes types when "preMove"
>> is set on the endpoint.
>>
>> Is that expected behavior?
>>
>> Jonathan
>>
>>
>> Claus Ibsen-2 wrote:
>>>
>>> On Thu, Dec 31, 2009 at 5:39 PM, jonathanq <jq...@abebooks.com> wrote:
>>>>
>>>> Excellent - that would definitely help my solution, as I could use lock
>>>> files
>>>> and if we had to kill the process, it would just delete those on next
>>>> start
>>>> up and re-process the files.
>>>>
>>>> So when is 2.2.0 coming out? :-)
>>>>
>>> I implemented this feature today so yeah it will be in 2.2
>>>
>>> Early 2010. I hope we get it out in the start of February. The last
>>> major goal is to have an improved thread pool configuration.
>>>
>>>
>>>> The solution I have works - and will probably what we will use for this
>>>> application. But that will help with future applications as we do end up
>>>> writing a lot of file based camel processes.
>>>>
>>>
>>> Yeah file is actually much harder than at first thought. Our goal is
>>> to make the file / ftp components in Camel flexible and to cover many
>>> of the use cases out there. So any feedback is valuable.
>>>
>>> I will git it some though to see if we can change the dynamic inflight
>>> throttler to be measuring metrics a bit earlier.
>>>
>>> Okay I guess its time to celebrate the new year.
>>>
>>>
>>>
>>>> Thanks for all the help!
>>>>
>>>> Jonathan
>>>>
>>>>
>>>> Claus Ibsen-2 wrote:
>>>>>
>>>>> On Thu, Dec 31, 2009 at 12:33 AM, jonathanq <jq...@abebooks.com>
>>>>> wrote:
>>>>>>
>>>>>> I took a good look at the Route Policy - at first the
>>>>>> ThrottlingInflightRoutePolicy class seemed like it could work - as I
>>>>>> really
>>>>>> only want 5 exchanges to be in-flight at a time.
>>>>>>
>>>>>> Unfortunately it would never suspend the consumer.  I dug deeper into
>>>>>> the
>>>>>> code and discovered why.  The ThrottlingInflightRoutePolicy class only
>>>>>> checks the number of inflight exchanges AFTER an exchange has been
>>>>>> processed
>>>>>> (the code to stop or start a consumer is all done in the
>>>>>> onExchangeDone
>>>>>> method).
>>>>>>
>>>>>> Since in my case the exchanges will take a while to process - it
>>>>>> wouldn't
>>>>>> know it had exceeded the maximum number until after it had finished
>>>>>> processing one of them.
>>>>>>
>>>>>> In my opinion that is a bug - or at the very least an important thing
>>>>>> to
>>>>>> note in the documentation.  I spent a fair bit of time trying to
>>>>>> figure
>>>>>> out
>>>>>> why I could not get it to work as it appeared it was supposed to.  All
>>>>>> because it was not checking the inflight numbers to the threshold
>>>>>> until
>>>>>> after it had finished processing an exchange.
>>>>>>
>>>>>> I also tried writing my own FileThrottlingRoutePolicy that would test
>>>>>> how
>>>>>> many files were in a "inprogress" directory - and stop the consumer if
>>>>>> it
>>>>>> exceeded the max concurrent files.
>>>>>>
>>>>>> However I ran into read/write issues when I used the preMove of files
>>>>>> -
>>>>>> for
>>>>>> some reason my processes later would throw exceptions about file not
>>>>>> found
>>>>>> or file lock (I can't remember which - i have been trying so many
>>>>>> different
>>>>>> things today to try and get this working).
>>>>>>
>>>>>> In the end I solved my problem by avoiding my problem :)
>>>>>>
>>>>>> The primary reason I didn't want the file locks to occur is it would
>>>>>> be a
>>>>>> manual cleanup if we ever had to kill the process while it's running.
>>>>>> Otherwise the next time it started, it would ignore any of the files
>>>>>> that
>>>>>> had a lock file as well.
>>>>>>
>>>>>
>>>>> We have a ticket for that
>>>>> https://issues.apache.org/activemq/browse/CAMEL-2082
>>>>>
>>>>>
>>>>>
>>>>>> I re-wrote my route to work as follows:
>>>>>>
>>>>>> from("file://incoming?maxMessagesPerPoll=1&idempotent=true&moveFailed=failed&move=processed&readLock=none").threads(5).process()
>>>>>>
>>>>>
>>>>> Nice solution :)
>>>>>
>>>>>> This way - when files are "finished" they will be placed in a
>>>>>> "processed"
>>>>>> directory, when they fail they are put in a "failed" directory.
>>>>>>  Anything
>>>>>> still in the incoming directory is to be processed.  Because the
>>>>>> memory
>>>>>> of
>>>>>> what was processed and what hasn't been was all in memory - restarting
>>>>>> the
>>>>>> process will just re-start any of the files still in the incoming
>>>>>> directory.
>>>>>>
>>>>>> No more Lock files means restarting it won't cause us to have to
>>>>>> delete
>>>>>> .lock files.
>>>>>>
>>>>>> I wish there was still an easier way to do what I wanted.  Now I just
>>>>>> have
>>>>>> to rely on the threads(5) to do the limiting to 5 files at a time.
>>>>>>  Although
>>>>>> if I understand your comment (and the documentation) I can't actually
>>>>>> rely
>>>>>> on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads
>>>>>> depending on the system load?
>>>>>>
>>>>>> Jonathan
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Claus Ibsen-2 wrote:
>>>>>>>
>>>>>>> Hi
>>>>>>>
>>>>>>> See also route policy to throttle the file consumer to a pace of 5
>>>>>>> concurrent files
>>>>>>> http://camel.apache.org/routepolicy.html
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez
>>>>>>> <ga...@steria.com>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>> jonathanq wrote:
>>>>>>>>>
>>>>>>>>> I am trying to write a process that will use a file endpoint (camel
>>>>>>>>> 2.1.0)
>>>>>>>>> to read from a directory.
>>>>>>>>>
>>>>>>>>> I need the process to read a file from the directory and then do
>>>>>>>>> some
>>>>>>>>> processing on the contents (namely hitting a REST service for each
>>>>>>>>> record
>>>>>>>>> in the file).  We have been asked to limit the number of threads
>>>>>>>>> that
>>>>>>>>> are
>>>>>>>>> hitting the service to 5.  So we decided to simply process 5 files
>>>>>>>>> at
>>>>>>>>> a
>>>>>>>>> time (to avoid concurrency issues reading 1 file and writing to 1
>>>>>>>>> file
>>>>>>>>> with 5 threads)
>>>>>>>>>
>>>>>>>>> I tried a few different approaches, and I wanted to see if there
>>>>>>>>> was a
>>>>>>>>> way
>>>>>>>>> to do what I want.
>>>>>>>>>
>>>>>>>>> Approach 1:
>>>>>>>>>
>>>>>>>>> from("file://incoming").to("seda:filequeue")
>>>>>>>>>
>>>>>>>>> from("seda:filequeue").thread(5).process()
>>>>>>>>>
>>>>>>>>> Now - this reads in ALL of the files in the directory (places
>>>>>>>>> camelLock
>>>>>>>>> on
>>>>>>>>> all) and then sends them to the seda endpoint.  I saw log messages
>>>>>>>>> that
>>>>>>>>> referred to thread 1 through 6.  But from what I read on the
>>>>>>>>> documentation, thread() is not necessarily going t limit it at that
>>>>>>>>> number.
>>>>>>>>>
>>>>>>>
>>>>>>> thread(5) will limit to at most 5 concurrent threads from this point
>>>>>>> forward.
>>>>>>>
>>>>>>>
>>>>>>>>> Approach 2:
>>>>>>>>>
>>>>>>>>> from("file://incoming").thread(5).process()
>>>>>>>>>
>>>>>>>>> This only processed 5 files at a time - but created camelLocks on
>>>>>>>>> all
>>>>>>>>> files in the directory.
>>>>>>>>>
>>>>>>>>> So then I tried approach 3:
>>>>>>>>>
>>>>>>>>> from("file://incoming").to("seda:filequeue")
>>>>>>>>>
>>>>>>>>> from("seda:filequeue?concurrentConsumers=5").process()
>>>>>>>>>
>>>>>>>>> Again this seems to work, however it puts a camelLock on all the
>>>>>>>>> files
>>>>>>>>> (because they were all processed by the first part of the route,
>>>>>>>>> they
>>>>>>>>> are
>>>>>>>>> just queued up in the second).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> While approach 3 works - what I would really like is to not have
>>>>>>>>> the
>>>>>>>>> camelLock placed on the files that are not being processed.
>>>>>>>>>
>>>>>>>>> So watching the directory, there would be (at most) 5 files with
>>>>>>>>> camelLock
>>>>>>>>> files created at a time, when they finish they are moved to the
>>>>>>>>> .camel
>>>>>>>>> directory, and then it starts processing the next file in the
>>>>>>>>> directory.
>>>>>>>>>
>>>>>>>
>>>>>>> You can also implement your own ProcessStrategy where you can deny
>>>>>>> consuming in more files than 5 at any given time.
>>>>>>> See the processStrategy option on the file consumer. Just return
>>>>>>> false
>>>>>>> on the begin() method.
>>>>>>>
>>>>>>> See
>>>>>>> http://camel.apache.org/file2.html
>>>>>>> in the bottom of the page.
>>>>>>>
>>>>>>>
>>>>>>>>> Is that possible?  Is there anything I should be sure to do in an
>>>>>>>>> error
>>>>>>>>> route so that I "roll back" the camel locks to ensure that
>>>>>>>>> unprocessed
>>>>>>>>> files are ready to process the next time the application starts?
>>>>>>>>>
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Maybe you can try to use the parameter maxMessagesPerPoll on the
>>>>>>>> file
>>>>>>>> endpoint i.e.:
>>>>>>>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process()
>>>>>>>>
>>>>>>>> Check the file component documentation :
>>>>>>>> http://camel.apache.org/file2.html
>>>>>>>>
>>>>>>>> --
>>>>>>>> View this message in context:
>>>>>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html
>>>>>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Claus Ibsen
>>>>>>> Apache Camel Committer
>>>>>>>
>>>>>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>>>>>> Open Source Integration: http://fusesource.com
>>>>>>> Blog: http://davsclaus.blogspot.com/
>>>>>>> Twitter: http://twitter.com/davsclaus
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html
>>>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Claus Ibsen
>>>>> Apache Camel Committer
>>>>>
>>>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>>>> Open Source Integration: http://fusesource.com
>>>>> Blog: http://davsclaus.blogspot.com/
>>>>> Twitter: http://twitter.com/davsclaus
>>>>>
>>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26980045.html
>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Claus Ibsen
>>> Apache Camel Committer
>>>
>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>> Open Source Integration: http://fusesource.com
>>> Blog: http://davsclaus.blogspot.com/
>>> Twitter: http://twitter.com/davsclaus
>>>
>>>
>>
>>
>
> --
> View this message in context: http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26981818.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: Processing 5 files at a time - Threads? SEDA + Concurrent Consumers?

Posted by jonathanq <jq...@abebooks.com>.
Sorry - I meant - this won't cause me too much trouble since I could just
change the code type to be File not GenericFile.  Just thought it was
strange.

I actually changed it to do a .convertBodyTo(String.class) in my route -
since I was just reading the file and that was why I needed a GenericFile
object to get the name to read.  Not sure why I didn't think of doing that
earlier.  

Jonathan 

jonathanq wrote:
> 
> Claus,
> 
> I figured out why my custom Route Policy wasn't working for me.  
> 
> It seems that there is an issue when I use preMove on the File endpoint -
> when the Exchange reaches my first Processor, it is no longer a
> "GenericFile" class.
> 
> My processor does this on the first line:
> 
> GenericFile file = exchange.getIn().getBody(GenericFile.class);
> 
> This works fine with my route - but as soon as I use preMove and put the
> in progress files into a "processing" directory.  The type of the body is
> no longer GenericFile, but is now a java.io.File.  So of course, file is
> now null.
> 
> This isn't an issue from my code, I can change the type and everything
> will work.  But it seems very strange that it changes types when "preMove"
> is set on the endpoint.
> 
> Is that expected behavior?
> 
> Jonathan
> 
> 
> Claus Ibsen-2 wrote:
>> 
>> On Thu, Dec 31, 2009 at 5:39 PM, jonathanq <jq...@abebooks.com> wrote:
>>>
>>> Excellent - that would definitely help my solution, as I could use lock
>>> files
>>> and if we had to kill the process, it would just delete those on next
>>> start
>>> up and re-process the files.
>>>
>>> So when is 2.2.0 coming out? :-)
>>>
>> I implemented this feature today so yeah it will be in 2.2
>> 
>> Early 2010. I hope we get it out in the start of February. The last
>> major goal is to have an improved thread pool configuration.
>> 
>> 
>>> The solution I have works - and will probably what we will use for this
>>> application. But that will help with future applications as we do end up
>>> writing a lot of file based camel processes.
>>>
>> 
>> Yeah file is actually much harder than at first thought. Our goal is
>> to make the file / ftp components in Camel flexible and to cover many
>> of the use cases out there. So any feedback is valuable.
>> 
>> I will git it some though to see if we can change the dynamic inflight
>> throttler to be measuring metrics a bit earlier.
>> 
>> Okay I guess its time to celebrate the new year.
>> 
>> 
>> 
>>> Thanks for all the help!
>>>
>>> Jonathan
>>>
>>>
>>> Claus Ibsen-2 wrote:
>>>>
>>>> On Thu, Dec 31, 2009 at 12:33 AM, jonathanq <jq...@abebooks.com>
>>>> wrote:
>>>>>
>>>>> I took a good look at the Route Policy - at first the
>>>>> ThrottlingInflightRoutePolicy class seemed like it could work - as I
>>>>> really
>>>>> only want 5 exchanges to be in-flight at a time.
>>>>>
>>>>> Unfortunately it would never suspend the consumer.  I dug deeper into
>>>>> the
>>>>> code and discovered why.  The ThrottlingInflightRoutePolicy class only
>>>>> checks the number of inflight exchanges AFTER an exchange has been
>>>>> processed
>>>>> (the code to stop or start a consumer is all done in the
>>>>> onExchangeDone
>>>>> method).
>>>>>
>>>>> Since in my case the exchanges will take a while to process - it
>>>>> wouldn't
>>>>> know it had exceeded the maximum number until after it had finished
>>>>> processing one of them.
>>>>>
>>>>> In my opinion that is a bug - or at the very least an important thing
>>>>> to
>>>>> note in the documentation.  I spent a fair bit of time trying to
>>>>> figure
>>>>> out
>>>>> why I could not get it to work as it appeared it was supposed to.  All
>>>>> because it was not checking the inflight numbers to the threshold
>>>>> until
>>>>> after it had finished processing an exchange.
>>>>>
>>>>> I also tried writing my own FileThrottlingRoutePolicy that would test
>>>>> how
>>>>> many files were in a "inprogress" directory - and stop the consumer if
>>>>> it
>>>>> exceeded the max concurrent files.
>>>>>
>>>>> However I ran into read/write issues when I used the preMove of files
>>>>> -
>>>>> for
>>>>> some reason my processes later would throw exceptions about file not
>>>>> found
>>>>> or file lock (I can't remember which - i have been trying so many
>>>>> different
>>>>> things today to try and get this working).
>>>>>
>>>>> In the end I solved my problem by avoiding my problem :)
>>>>>
>>>>> The primary reason I didn't want the file locks to occur is it would
>>>>> be a
>>>>> manual cleanup if we ever had to kill the process while it's running.
>>>>> Otherwise the next time it started, it would ignore any of the files
>>>>> that
>>>>> had a lock file as well.
>>>>>
>>>>
>>>> We have a ticket for that
>>>> https://issues.apache.org/activemq/browse/CAMEL-2082
>>>>
>>>>
>>>>
>>>>> I re-wrote my route to work as follows:
>>>>>
>>>>> from("file://incoming?maxMessagesPerPoll=1&idempotent=true&moveFailed=failed&move=processed&readLock=none").threads(5).process()
>>>>>
>>>>
>>>> Nice solution :)
>>>>
>>>>> This way - when files are "finished" they will be placed in a
>>>>> "processed"
>>>>> directory, when they fail they are put in a "failed" directory.
>>>>>  Anything
>>>>> still in the incoming directory is to be processed.  Because the
>>>>> memory
>>>>> of
>>>>> what was processed and what hasn't been was all in memory - restarting
>>>>> the
>>>>> process will just re-start any of the files still in the incoming
>>>>> directory.
>>>>>
>>>>> No more Lock files means restarting it won't cause us to have to
>>>>> delete
>>>>> .lock files.
>>>>>
>>>>> I wish there was still an easier way to do what I wanted.  Now I just
>>>>> have
>>>>> to rely on the threads(5) to do the limiting to 5 files at a time.
>>>>>  Although
>>>>> if I understand your comment (and the documentation) I can't actually
>>>>> rely
>>>>> on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads
>>>>> depending on the system load?
>>>>>
>>>>> Jonathan
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Claus Ibsen-2 wrote:
>>>>>>
>>>>>> Hi
>>>>>>
>>>>>> See also route policy to throttle the file consumer to a pace of 5
>>>>>> concurrent files
>>>>>> http://camel.apache.org/routepolicy.html
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez
>>>>>> <ga...@steria.com>
>>>>>> wrote:
>>>>>>>
>>>>>>>
>>>>>>> jonathanq wrote:
>>>>>>>>
>>>>>>>> I am trying to write a process that will use a file endpoint (camel
>>>>>>>> 2.1.0)
>>>>>>>> to read from a directory.
>>>>>>>>
>>>>>>>> I need the process to read a file from the directory and then do
>>>>>>>> some
>>>>>>>> processing on the contents (namely hitting a REST service for each
>>>>>>>> record
>>>>>>>> in the file).  We have been asked to limit the number of threads
>>>>>>>> that
>>>>>>>> are
>>>>>>>> hitting the service to 5.  So we decided to simply process 5 files
>>>>>>>> at
>>>>>>>> a
>>>>>>>> time (to avoid concurrency issues reading 1 file and writing to 1
>>>>>>>> file
>>>>>>>> with 5 threads)
>>>>>>>>
>>>>>>>> I tried a few different approaches, and I wanted to see if there
>>>>>>>> was a
>>>>>>>> way
>>>>>>>> to do what I want.
>>>>>>>>
>>>>>>>> Approach 1:
>>>>>>>>
>>>>>>>> from("file://incoming").to("seda:filequeue")
>>>>>>>>
>>>>>>>> from("seda:filequeue").thread(5).process()
>>>>>>>>
>>>>>>>> Now - this reads in ALL of the files in the directory (places
>>>>>>>> camelLock
>>>>>>>> on
>>>>>>>> all) and then sends them to the seda endpoint.  I saw log messages
>>>>>>>> that
>>>>>>>> referred to thread 1 through 6.  But from what I read on the
>>>>>>>> documentation, thread() is not necessarily going t limit it at that
>>>>>>>> number.
>>>>>>>>
>>>>>>
>>>>>> thread(5) will limit to at most 5 concurrent threads from this point
>>>>>> forward.
>>>>>>
>>>>>>
>>>>>>>> Approach 2:
>>>>>>>>
>>>>>>>> from("file://incoming").thread(5).process()
>>>>>>>>
>>>>>>>> This only processed 5 files at a time - but created camelLocks on
>>>>>>>> all
>>>>>>>> files in the directory.
>>>>>>>>
>>>>>>>> So then I tried approach 3:
>>>>>>>>
>>>>>>>> from("file://incoming").to("seda:filequeue")
>>>>>>>>
>>>>>>>> from("seda:filequeue?concurrentConsumers=5").process()
>>>>>>>>
>>>>>>>> Again this seems to work, however it puts a camelLock on all the
>>>>>>>> files
>>>>>>>> (because they were all processed by the first part of the route,
>>>>>>>> they
>>>>>>>> are
>>>>>>>> just queued up in the second).
>>>>>>>>
>>>>>>>>
>>>>>>>> While approach 3 works - what I would really like is to not have
>>>>>>>> the
>>>>>>>> camelLock placed on the files that are not being processed.
>>>>>>>>
>>>>>>>> So watching the directory, there would be (at most) 5 files with
>>>>>>>> camelLock
>>>>>>>> files created at a time, when they finish they are moved to the
>>>>>>>> .camel
>>>>>>>> directory, and then it starts processing the next file in the
>>>>>>>> directory.
>>>>>>>>
>>>>>>
>>>>>> You can also implement your own ProcessStrategy where you can deny
>>>>>> consuming in more files than 5 at any given time.
>>>>>> See the processStrategy option on the file consumer. Just return
>>>>>> false
>>>>>> on the begin() method.
>>>>>>
>>>>>> See
>>>>>> http://camel.apache.org/file2.html
>>>>>> in the bottom of the page.
>>>>>>
>>>>>>
>>>>>>>> Is that possible?  Is there anything I should be sure to do in an
>>>>>>>> error
>>>>>>>> route so that I "roll back" the camel locks to ensure that
>>>>>>>> unprocessed
>>>>>>>> files are ready to process the next time the application starts?
>>>>>>>>
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Maybe you can try to use the parameter maxMessagesPerPoll on the
>>>>>>> file
>>>>>>> endpoint i.e.:
>>>>>>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process()
>>>>>>>
>>>>>>> Check the file component documentation :
>>>>>>> http://camel.apache.org/file2.html
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html
>>>>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Claus Ibsen
>>>>>> Apache Camel Committer
>>>>>>
>>>>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>>>>> Open Source Integration: http://fusesource.com
>>>>>> Blog: http://davsclaus.blogspot.com/
>>>>>> Twitter: http://twitter.com/davsclaus
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html
>>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Claus Ibsen
>>>> Apache Camel Committer
>>>>
>>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>>> Open Source Integration: http://fusesource.com
>>>> Blog: http://davsclaus.blogspot.com/
>>>> Twitter: http://twitter.com/davsclaus
>>>>
>>>>
>>>
>>> --
>>> View this message in context:
>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26980045.html
>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>
>>>
>> 
>> 
>> 
>> -- 
>> Claus Ibsen
>> Apache Camel Committer
>> 
>> Author of Camel in Action: http://www.manning.com/ibsen/
>> Open Source Integration: http://fusesource.com
>> Blog: http://davsclaus.blogspot.com/
>> Twitter: http://twitter.com/davsclaus
>> 
>> 
> 
> 

-- 
View this message in context: http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26981818.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Processing 5 files at a time - Threads? SEDA + Concurrent Consumers?

Posted by jonathanq <jq...@abebooks.com>.
Claus,

I figured out why my custom Route Policy wasn't working for me.  

It seems that there is an issue when I use preMove on the File endpoint -
when the Exchange reaches my first Processor, it is no longer a
"GenericFile" class.

My processor does this on the first line:

GenericFile file = exchange.getIn().getBody(GenericFile.class);

This works fine with my route - but as soon as I use preMove and put the in
progress files into a "processing" directory.  The type of the body is no
longer GenericFile, but is now a java.io.File.  So of course, file is now
null.

This isn't an issue from my code, I can change the type and everything will
work.  But it seems very strange that it changes types when "preMove" is set
on the endpoint.

Is that expected behavior?

Jonathan


Claus Ibsen-2 wrote:
> 
> On Thu, Dec 31, 2009 at 5:39 PM, jonathanq <jq...@abebooks.com> wrote:
>>
>> Excellent - that would definitely help my solution, as I could use lock
>> files
>> and if we had to kill the process, it would just delete those on next
>> start
>> up and re-process the files.
>>
>> So when is 2.2.0 coming out? :-)
>>
> I implemented this feature today so yeah it will be in 2.2
> 
> Early 2010. I hope we get it out in the start of February. The last
> major goal is to have an improved thread pool configuration.
> 
> 
>> The solution I have works - and will probably what we will use for this
>> application. But that will help with future applications as we do end up
>> writing a lot of file based camel processes.
>>
> 
> Yeah file is actually much harder than at first thought. Our goal is
> to make the file / ftp components in Camel flexible and to cover many
> of the use cases out there. So any feedback is valuable.
> 
> I will git it some though to see if we can change the dynamic inflight
> throttler to be measuring metrics a bit earlier.
> 
> Okay I guess its time to celebrate the new year.
> 
> 
> 
>> Thanks for all the help!
>>
>> Jonathan
>>
>>
>> Claus Ibsen-2 wrote:
>>>
>>> On Thu, Dec 31, 2009 at 12:33 AM, jonathanq <jq...@abebooks.com> wrote:
>>>>
>>>> I took a good look at the Route Policy - at first the
>>>> ThrottlingInflightRoutePolicy class seemed like it could work - as I
>>>> really
>>>> only want 5 exchanges to be in-flight at a time.
>>>>
>>>> Unfortunately it would never suspend the consumer.  I dug deeper into
>>>> the
>>>> code and discovered why.  The ThrottlingInflightRoutePolicy class only
>>>> checks the number of inflight exchanges AFTER an exchange has been
>>>> processed
>>>> (the code to stop or start a consumer is all done in the onExchangeDone
>>>> method).
>>>>
>>>> Since in my case the exchanges will take a while to process - it
>>>> wouldn't
>>>> know it had exceeded the maximum number until after it had finished
>>>> processing one of them.
>>>>
>>>> In my opinion that is a bug - or at the very least an important thing
>>>> to
>>>> note in the documentation.  I spent a fair bit of time trying to figure
>>>> out
>>>> why I could not get it to work as it appeared it was supposed to.  All
>>>> because it was not checking the inflight numbers to the threshold until
>>>> after it had finished processing an exchange.
>>>>
>>>> I also tried writing my own FileThrottlingRoutePolicy that would test
>>>> how
>>>> many files were in a "inprogress" directory - and stop the consumer if
>>>> it
>>>> exceeded the max concurrent files.
>>>>
>>>> However I ran into read/write issues when I used the preMove of files -
>>>> for
>>>> some reason my processes later would throw exceptions about file not
>>>> found
>>>> or file lock (I can't remember which - i have been trying so many
>>>> different
>>>> things today to try and get this working).
>>>>
>>>> In the end I solved my problem by avoiding my problem :)
>>>>
>>>> The primary reason I didn't want the file locks to occur is it would be
>>>> a
>>>> manual cleanup if we ever had to kill the process while it's running.
>>>> Otherwise the next time it started, it would ignore any of the files
>>>> that
>>>> had a lock file as well.
>>>>
>>>
>>> We have a ticket for that
>>> https://issues.apache.org/activemq/browse/CAMEL-2082
>>>
>>>
>>>
>>>> I re-wrote my route to work as follows:
>>>>
>>>> from("file://incoming?maxMessagesPerPoll=1&idempotent=true&moveFailed=failed&move=processed&readLock=none").threads(5).process()
>>>>
>>>
>>> Nice solution :)
>>>
>>>> This way - when files are "finished" they will be placed in a
>>>> "processed"
>>>> directory, when they fail they are put in a "failed" directory.
>>>>  Anything
>>>> still in the incoming directory is to be processed.  Because the memory
>>>> of
>>>> what was processed and what hasn't been was all in memory - restarting
>>>> the
>>>> process will just re-start any of the files still in the incoming
>>>> directory.
>>>>
>>>> No more Lock files means restarting it won't cause us to have to delete
>>>> .lock files.
>>>>
>>>> I wish there was still an easier way to do what I wanted.  Now I just
>>>> have
>>>> to rely on the threads(5) to do the limiting to 5 files at a time.
>>>>  Although
>>>> if I understand your comment (and the documentation) I can't actually
>>>> rely
>>>> on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads
>>>> depending on the system load?
>>>>
>>>> Jonathan
>>>>
>>>>
>>>>
>>>>
>>>> Claus Ibsen-2 wrote:
>>>>>
>>>>> Hi
>>>>>
>>>>> See also route policy to throttle the file consumer to a pace of 5
>>>>> concurrent files
>>>>> http://camel.apache.org/routepolicy.html
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez
>>>>> <ga...@steria.com>
>>>>> wrote:
>>>>>>
>>>>>>
>>>>>> jonathanq wrote:
>>>>>>>
>>>>>>> I am trying to write a process that will use a file endpoint (camel
>>>>>>> 2.1.0)
>>>>>>> to read from a directory.
>>>>>>>
>>>>>>> I need the process to read a file from the directory and then do
>>>>>>> some
>>>>>>> processing on the contents (namely hitting a REST service for each
>>>>>>> record
>>>>>>> in the file).  We have been asked to limit the number of threads
>>>>>>> that
>>>>>>> are
>>>>>>> hitting the service to 5.  So we decided to simply process 5 files
>>>>>>> at
>>>>>>> a
>>>>>>> time (to avoid concurrency issues reading 1 file and writing to 1
>>>>>>> file
>>>>>>> with 5 threads)
>>>>>>>
>>>>>>> I tried a few different approaches, and I wanted to see if there was
>>>>>>> a
>>>>>>> way
>>>>>>> to do what I want.
>>>>>>>
>>>>>>> Approach 1:
>>>>>>>
>>>>>>> from("file://incoming").to("seda:filequeue")
>>>>>>>
>>>>>>> from("seda:filequeue").thread(5).process()
>>>>>>>
>>>>>>> Now - this reads in ALL of the files in the directory (places
>>>>>>> camelLock
>>>>>>> on
>>>>>>> all) and then sends them to the seda endpoint.  I saw log messages
>>>>>>> that
>>>>>>> referred to thread 1 through 6.  But from what I read on the
>>>>>>> documentation, thread() is not necessarily going t limit it at that
>>>>>>> number.
>>>>>>>
>>>>>
>>>>> thread(5) will limit to at most 5 concurrent threads from this point
>>>>> forward.
>>>>>
>>>>>
>>>>>>> Approach 2:
>>>>>>>
>>>>>>> from("file://incoming").thread(5).process()
>>>>>>>
>>>>>>> This only processed 5 files at a time - but created camelLocks on
>>>>>>> all
>>>>>>> files in the directory.
>>>>>>>
>>>>>>> So then I tried approach 3:
>>>>>>>
>>>>>>> from("file://incoming").to("seda:filequeue")
>>>>>>>
>>>>>>> from("seda:filequeue?concurrentConsumers=5").process()
>>>>>>>
>>>>>>> Again this seems to work, however it puts a camelLock on all the
>>>>>>> files
>>>>>>> (because they were all processed by the first part of the route,
>>>>>>> they
>>>>>>> are
>>>>>>> just queued up in the second).
>>>>>>>
>>>>>>>
>>>>>>> While approach 3 works - what I would really like is to not have the
>>>>>>> camelLock placed on the files that are not being processed.
>>>>>>>
>>>>>>> So watching the directory, there would be (at most) 5 files with
>>>>>>> camelLock
>>>>>>> files created at a time, when they finish they are moved to the
>>>>>>> .camel
>>>>>>> directory, and then it starts processing the next file in the
>>>>>>> directory.
>>>>>>>
>>>>>
>>>>> You can also implement your own ProcessStrategy where you can deny
>>>>> consuming in more files than 5 at any given time.
>>>>> See the processStrategy option on the file consumer. Just return false
>>>>> on the begin() method.
>>>>>
>>>>> See
>>>>> http://camel.apache.org/file2.html
>>>>> in the bottom of the page.
>>>>>
>>>>>
>>>>>>> Is that possible?  Is there anything I should be sure to do in an
>>>>>>> error
>>>>>>> route so that I "roll back" the camel locks to ensure that
>>>>>>> unprocessed
>>>>>>> files are ready to process the next time the application starts?
>>>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Maybe you can try to use the parameter maxMessagesPerPoll on the file
>>>>>> endpoint i.e.:
>>>>>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process()
>>>>>>
>>>>>> Check the file component documentation :
>>>>>> http://camel.apache.org/file2.html
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html
>>>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Claus Ibsen
>>>>> Apache Camel Committer
>>>>>
>>>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>>>> Open Source Integration: http://fusesource.com
>>>>> Blog: http://davsclaus.blogspot.com/
>>>>> Twitter: http://twitter.com/davsclaus
>>>>>
>>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html
>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Claus Ibsen
>>> Apache Camel Committer
>>>
>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>> Open Source Integration: http://fusesource.com
>>> Blog: http://davsclaus.blogspot.com/
>>> Twitter: http://twitter.com/davsclaus
>>>
>>>
>>
>> --
>> View this message in context:
>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26980045.html
>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>
>>
> 
> 
> 
> -- 
> Claus Ibsen
> Apache Camel Committer
> 
> Author of Camel in Action: http://www.manning.com/ibsen/
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
> 
> 

-- 
View this message in context: http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26981331.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Processing 5 files at a time - Threads? SEDA + Concurrent Consumers?

Posted by Claus Ibsen <cl...@gmail.com>.
On Thu, Dec 31, 2009 at 5:39 PM, jonathanq <jq...@abebooks.com> wrote:
>
> Excellent - that would definitely help my solution, as I could use lock files
> and if we had to kill the process, it would just delete those on next start
> up and re-process the files.
>
> So when is 2.2.0 coming out? :-)
>
I implemented this feature today so yeah it will be in 2.2

Early 2010. I hope we get it out in the start of February. The last
major goal is to have an improved thread pool configuration.


> The solution I have works - and will probably what we will use for this
> application. But that will help with future applications as we do end up
> writing a lot of file based camel processes.
>

Yeah file is actually much harder than at first thought. Our goal is
to make the file / ftp components in Camel flexible and to cover many
of the use cases out there. So any feedback is valuable.

I will git it some though to see if we can change the dynamic inflight
throttler to be measuring metrics a bit earlier.

Okay I guess its time to celebrate the new year.



> Thanks for all the help!
>
> Jonathan
>
>
> Claus Ibsen-2 wrote:
>>
>> On Thu, Dec 31, 2009 at 12:33 AM, jonathanq <jq...@abebooks.com> wrote:
>>>
>>> I took a good look at the Route Policy - at first the
>>> ThrottlingInflightRoutePolicy class seemed like it could work - as I
>>> really
>>> only want 5 exchanges to be in-flight at a time.
>>>
>>> Unfortunately it would never suspend the consumer.  I dug deeper into the
>>> code and discovered why.  The ThrottlingInflightRoutePolicy class only
>>> checks the number of inflight exchanges AFTER an exchange has been
>>> processed
>>> (the code to stop or start a consumer is all done in the onExchangeDone
>>> method).
>>>
>>> Since in my case the exchanges will take a while to process - it wouldn't
>>> know it had exceeded the maximum number until after it had finished
>>> processing one of them.
>>>
>>> In my opinion that is a bug - or at the very least an important thing to
>>> note in the documentation.  I spent a fair bit of time trying to figure
>>> out
>>> why I could not get it to work as it appeared it was supposed to.  All
>>> because it was not checking the inflight numbers to the threshold until
>>> after it had finished processing an exchange.
>>>
>>> I also tried writing my own FileThrottlingRoutePolicy that would test how
>>> many files were in a "inprogress" directory - and stop the consumer if it
>>> exceeded the max concurrent files.
>>>
>>> However I ran into read/write issues when I used the preMove of files -
>>> for
>>> some reason my processes later would throw exceptions about file not
>>> found
>>> or file lock (I can't remember which - i have been trying so many
>>> different
>>> things today to try and get this working).
>>>
>>> In the end I solved my problem by avoiding my problem :)
>>>
>>> The primary reason I didn't want the file locks to occur is it would be a
>>> manual cleanup if we ever had to kill the process while it's running.
>>> Otherwise the next time it started, it would ignore any of the files that
>>> had a lock file as well.
>>>
>>
>> We have a ticket for that
>> https://issues.apache.org/activemq/browse/CAMEL-2082
>>
>>
>>
>>> I re-wrote my route to work as follows:
>>>
>>> from("file://incoming?maxMessagesPerPoll=1&idempotent=true&moveFailed=failed&move=processed&readLock=none").threads(5).process()
>>>
>>
>> Nice solution :)
>>
>>> This way - when files are "finished" they will be placed in a "processed"
>>> directory, when they fail they are put in a "failed" directory.  Anything
>>> still in the incoming directory is to be processed.  Because the memory
>>> of
>>> what was processed and what hasn't been was all in memory - restarting
>>> the
>>> process will just re-start any of the files still in the incoming
>>> directory.
>>>
>>> No more Lock files means restarting it won't cause us to have to delete
>>> .lock files.
>>>
>>> I wish there was still an easier way to do what I wanted.  Now I just
>>> have
>>> to rely on the threads(5) to do the limiting to 5 files at a time.
>>>  Although
>>> if I understand your comment (and the documentation) I can't actually
>>> rely
>>> on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads
>>> depending on the system load?
>>>
>>> Jonathan
>>>
>>>
>>>
>>>
>>> Claus Ibsen-2 wrote:
>>>>
>>>> Hi
>>>>
>>>> See also route policy to throttle the file consumer to a pace of 5
>>>> concurrent files
>>>> http://camel.apache.org/routepolicy.html
>>>>
>>>>
>>>>
>>>> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez <ga...@steria.com>
>>>> wrote:
>>>>>
>>>>>
>>>>> jonathanq wrote:
>>>>>>
>>>>>> I am trying to write a process that will use a file endpoint (camel
>>>>>> 2.1.0)
>>>>>> to read from a directory.
>>>>>>
>>>>>> I need the process to read a file from the directory and then do some
>>>>>> processing on the contents (namely hitting a REST service for each
>>>>>> record
>>>>>> in the file).  We have been asked to limit the number of threads that
>>>>>> are
>>>>>> hitting the service to 5.  So we decided to simply process 5 files at
>>>>>> a
>>>>>> time (to avoid concurrency issues reading 1 file and writing to 1 file
>>>>>> with 5 threads)
>>>>>>
>>>>>> I tried a few different approaches, and I wanted to see if there was a
>>>>>> way
>>>>>> to do what I want.
>>>>>>
>>>>>> Approach 1:
>>>>>>
>>>>>> from("file://incoming").to("seda:filequeue")
>>>>>>
>>>>>> from("seda:filequeue").thread(5).process()
>>>>>>
>>>>>> Now - this reads in ALL of the files in the directory (places
>>>>>> camelLock
>>>>>> on
>>>>>> all) and then sends them to the seda endpoint.  I saw log messages
>>>>>> that
>>>>>> referred to thread 1 through 6.  But from what I read on the
>>>>>> documentation, thread() is not necessarily going t limit it at that
>>>>>> number.
>>>>>>
>>>>
>>>> thread(5) will limit to at most 5 concurrent threads from this point
>>>> forward.
>>>>
>>>>
>>>>>> Approach 2:
>>>>>>
>>>>>> from("file://incoming").thread(5).process()
>>>>>>
>>>>>> This only processed 5 files at a time - but created camelLocks on all
>>>>>> files in the directory.
>>>>>>
>>>>>> So then I tried approach 3:
>>>>>>
>>>>>> from("file://incoming").to("seda:filequeue")
>>>>>>
>>>>>> from("seda:filequeue?concurrentConsumers=5").process()
>>>>>>
>>>>>> Again this seems to work, however it puts a camelLock on all the files
>>>>>> (because they were all processed by the first part of the route, they
>>>>>> are
>>>>>> just queued up in the second).
>>>>>>
>>>>>>
>>>>>> While approach 3 works - what I would really like is to not have the
>>>>>> camelLock placed on the files that are not being processed.
>>>>>>
>>>>>> So watching the directory, there would be (at most) 5 files with
>>>>>> camelLock
>>>>>> files created at a time, when they finish they are moved to the .camel
>>>>>> directory, and then it starts processing the next file in the
>>>>>> directory.
>>>>>>
>>>>
>>>> You can also implement your own ProcessStrategy where you can deny
>>>> consuming in more files than 5 at any given time.
>>>> See the processStrategy option on the file consumer. Just return false
>>>> on the begin() method.
>>>>
>>>> See
>>>> http://camel.apache.org/file2.html
>>>> in the bottom of the page.
>>>>
>>>>
>>>>>> Is that possible?  Is there anything I should be sure to do in an
>>>>>> error
>>>>>> route so that I "roll back" the camel locks to ensure that unprocessed
>>>>>> files are ready to process the next time the application starts?
>>>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>> Maybe you can try to use the parameter maxMessagesPerPoll on the file
>>>>> endpoint i.e.:
>>>>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process()
>>>>>
>>>>> Check the file component documentation :
>>>>> http://camel.apache.org/file2.html
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html
>>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Claus Ibsen
>>>> Apache Camel Committer
>>>>
>>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>>> Open Source Integration: http://fusesource.com
>>>> Blog: http://davsclaus.blogspot.com/
>>>> Twitter: http://twitter.com/davsclaus
>>>>
>>>>
>>>
>>> --
>>> View this message in context:
>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html
>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>
>>>
>>
>>
>>
>> --
>> Claus Ibsen
>> Apache Camel Committer
>>
>> Author of Camel in Action: http://www.manning.com/ibsen/
>> Open Source Integration: http://fusesource.com
>> Blog: http://davsclaus.blogspot.com/
>> Twitter: http://twitter.com/davsclaus
>>
>>
>
> --
> View this message in context: http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26980045.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: Processing 5 files at a time - Threads? SEDA + Concurrent Consumers?

Posted by jonathanq <jq...@abebooks.com>.
Excellent - that would definitely help my solution, as I could use lock files
and if we had to kill the process, it would just delete those on next start
up and re-process the files.

So when is 2.2.0 coming out? :-)

The solution I have works - and will probably what we will use for this
application. But that will help with future applications as we do end up
writing a lot of file based camel processes.

Thanks for all the help!

Jonathan


Claus Ibsen-2 wrote:
> 
> On Thu, Dec 31, 2009 at 12:33 AM, jonathanq <jq...@abebooks.com> wrote:
>>
>> I took a good look at the Route Policy - at first the
>> ThrottlingInflightRoutePolicy class seemed like it could work - as I
>> really
>> only want 5 exchanges to be in-flight at a time.
>>
>> Unfortunately it would never suspend the consumer.  I dug deeper into the
>> code and discovered why.  The ThrottlingInflightRoutePolicy class only
>> checks the number of inflight exchanges AFTER an exchange has been
>> processed
>> (the code to stop or start a consumer is all done in the onExchangeDone
>> method).
>>
>> Since in my case the exchanges will take a while to process - it wouldn't
>> know it had exceeded the maximum number until after it had finished
>> processing one of them.
>>
>> In my opinion that is a bug - or at the very least an important thing to
>> note in the documentation.  I spent a fair bit of time trying to figure
>> out
>> why I could not get it to work as it appeared it was supposed to.  All
>> because it was not checking the inflight numbers to the threshold until
>> after it had finished processing an exchange.
>>
>> I also tried writing my own FileThrottlingRoutePolicy that would test how
>> many files were in a "inprogress" directory - and stop the consumer if it
>> exceeded the max concurrent files.
>>
>> However I ran into read/write issues when I used the preMove of files -
>> for
>> some reason my processes later would throw exceptions about file not
>> found
>> or file lock (I can't remember which - i have been trying so many
>> different
>> things today to try and get this working).
>>
>> In the end I solved my problem by avoiding my problem :)
>>
>> The primary reason I didn't want the file locks to occur is it would be a
>> manual cleanup if we ever had to kill the process while it's running.
>> Otherwise the next time it started, it would ignore any of the files that
>> had a lock file as well.
>>
> 
> We have a ticket for that
> https://issues.apache.org/activemq/browse/CAMEL-2082
> 
> 
> 
>> I re-wrote my route to work as follows:
>>
>> from("file://incoming?maxMessagesPerPoll=1&idempotent=true&moveFailed=failed&move=processed&readLock=none").threads(5).process()
>>
> 
> Nice solution :)
> 
>> This way - when files are "finished" they will be placed in a "processed"
>> directory, when they fail they are put in a "failed" directory.  Anything
>> still in the incoming directory is to be processed.  Because the memory
>> of
>> what was processed and what hasn't been was all in memory - restarting
>> the
>> process will just re-start any of the files still in the incoming
>> directory.
>>
>> No more Lock files means restarting it won't cause us to have to delete
>> .lock files.
>>
>> I wish there was still an easier way to do what I wanted.  Now I just
>> have
>> to rely on the threads(5) to do the limiting to 5 files at a time.
>>  Although
>> if I understand your comment (and the documentation) I can't actually
>> rely
>> on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads
>> depending on the system load?
>>
>> Jonathan
>>
>>
>>
>>
>> Claus Ibsen-2 wrote:
>>>
>>> Hi
>>>
>>> See also route policy to throttle the file consumer to a pace of 5
>>> concurrent files
>>> http://camel.apache.org/routepolicy.html
>>>
>>>
>>>
>>> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez <ga...@steria.com>
>>> wrote:
>>>>
>>>>
>>>> jonathanq wrote:
>>>>>
>>>>> I am trying to write a process that will use a file endpoint (camel
>>>>> 2.1.0)
>>>>> to read from a directory.
>>>>>
>>>>> I need the process to read a file from the directory and then do some
>>>>> processing on the contents (namely hitting a REST service for each
>>>>> record
>>>>> in the file).  We have been asked to limit the number of threads that
>>>>> are
>>>>> hitting the service to 5.  So we decided to simply process 5 files at
>>>>> a
>>>>> time (to avoid concurrency issues reading 1 file and writing to 1 file
>>>>> with 5 threads)
>>>>>
>>>>> I tried a few different approaches, and I wanted to see if there was a
>>>>> way
>>>>> to do what I want.
>>>>>
>>>>> Approach 1:
>>>>>
>>>>> from("file://incoming").to("seda:filequeue")
>>>>>
>>>>> from("seda:filequeue").thread(5).process()
>>>>>
>>>>> Now - this reads in ALL of the files in the directory (places
>>>>> camelLock
>>>>> on
>>>>> all) and then sends them to the seda endpoint.  I saw log messages
>>>>> that
>>>>> referred to thread 1 through 6.  But from what I read on the
>>>>> documentation, thread() is not necessarily going t limit it at that
>>>>> number.
>>>>>
>>>
>>> thread(5) will limit to at most 5 concurrent threads from this point
>>> forward.
>>>
>>>
>>>>> Approach 2:
>>>>>
>>>>> from("file://incoming").thread(5).process()
>>>>>
>>>>> This only processed 5 files at a time - but created camelLocks on all
>>>>> files in the directory.
>>>>>
>>>>> So then I tried approach 3:
>>>>>
>>>>> from("file://incoming").to("seda:filequeue")
>>>>>
>>>>> from("seda:filequeue?concurrentConsumers=5").process()
>>>>>
>>>>> Again this seems to work, however it puts a camelLock on all the files
>>>>> (because they were all processed by the first part of the route, they
>>>>> are
>>>>> just queued up in the second).
>>>>>
>>>>>
>>>>> While approach 3 works - what I would really like is to not have the
>>>>> camelLock placed on the files that are not being processed.
>>>>>
>>>>> So watching the directory, there would be (at most) 5 files with
>>>>> camelLock
>>>>> files created at a time, when they finish they are moved to the .camel
>>>>> directory, and then it starts processing the next file in the
>>>>> directory.
>>>>>
>>>
>>> You can also implement your own ProcessStrategy where you can deny
>>> consuming in more files than 5 at any given time.
>>> See the processStrategy option on the file consumer. Just return false
>>> on the begin() method.
>>>
>>> See
>>> http://camel.apache.org/file2.html
>>> in the bottom of the page.
>>>
>>>
>>>>> Is that possible?  Is there anything I should be sure to do in an
>>>>> error
>>>>> route so that I "roll back" the camel locks to ensure that unprocessed
>>>>> files are ready to process the next time the application starts?
>>>>>
>>>>
>>>> Hi,
>>>>
>>>> Maybe you can try to use the parameter maxMessagesPerPoll on the file
>>>> endpoint i.e.:
>>>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process()
>>>>
>>>> Check the file component documentation :
>>>> http://camel.apache.org/file2.html
>>>>
>>>> --
>>>> View this message in context:
>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html
>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Claus Ibsen
>>> Apache Camel Committer
>>>
>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>> Open Source Integration: http://fusesource.com
>>> Blog: http://davsclaus.blogspot.com/
>>> Twitter: http://twitter.com/davsclaus
>>>
>>>
>>
>> --
>> View this message in context:
>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html
>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>
>>
> 
> 
> 
> -- 
> Claus Ibsen
> Apache Camel Committer
> 
> Author of Camel in Action: http://www.manning.com/ibsen/
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
> 
> 

-- 
View this message in context: http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26980045.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Processing 5 files at a time - Threads? SEDA + Concurrent Consumers?

Posted by Claus Ibsen <cl...@gmail.com>.
On Thu, Dec 31, 2009 at 12:33 AM, jonathanq <jq...@abebooks.com> wrote:
>
> I took a good look at the Route Policy - at first the
> ThrottlingInflightRoutePolicy class seemed like it could work - as I really
> only want 5 exchanges to be in-flight at a time.
>
> Unfortunately it would never suspend the consumer.  I dug deeper into the
> code and discovered why.  The ThrottlingInflightRoutePolicy class only
> checks the number of inflight exchanges AFTER an exchange has been processed
> (the code to stop or start a consumer is all done in the onExchangeDone
> method).
>
> Since in my case the exchanges will take a while to process - it wouldn't
> know it had exceeded the maximum number until after it had finished
> processing one of them.
>
> In my opinion that is a bug - or at the very least an important thing to
> note in the documentation.  I spent a fair bit of time trying to figure out
> why I could not get it to work as it appeared it was supposed to.  All
> because it was not checking the inflight numbers to the threshold until
> after it had finished processing an exchange.
>
> I also tried writing my own FileThrottlingRoutePolicy that would test how
> many files were in a "inprogress" directory - and stop the consumer if it
> exceeded the max concurrent files.
>
> However I ran into read/write issues when I used the preMove of files - for
> some reason my processes later would throw exceptions about file not found
> or file lock (I can't remember which - i have been trying so many different
> things today to try and get this working).
>
> In the end I solved my problem by avoiding my problem :)
>
> The primary reason I didn't want the file locks to occur is it would be a
> manual cleanup if we ever had to kill the process while it's running.
> Otherwise the next time it started, it would ignore any of the files that
> had a lock file as well.
>

We have a ticket for that
https://issues.apache.org/activemq/browse/CAMEL-2082



> I re-wrote my route to work as follows:
>
> from("file://incoming?maxMessagesPerPoll=1&idempotent=true&moveFailed=failed&move=processed&readLock=none").threads(5).process()
>

Nice solution :)

> This way - when files are "finished" they will be placed in a "processed"
> directory, when they fail they are put in a "failed" directory.  Anything
> still in the incoming directory is to be processed.  Because the memory of
> what was processed and what hasn't been was all in memory - restarting the
> process will just re-start any of the files still in the incoming directory.
>
> No more Lock files means restarting it won't cause us to have to delete
> .lock files.
>
> I wish there was still an easier way to do what I wanted.  Now I just have
> to rely on the threads(5) to do the limiting to 5 files at a time.  Although
> if I understand your comment (and the documentation) I can't actually rely
> on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads
> depending on the system load?
>
> Jonathan
>
>
>
>
> Claus Ibsen-2 wrote:
>>
>> Hi
>>
>> See also route policy to throttle the file consumer to a pace of 5
>> concurrent files
>> http://camel.apache.org/routepolicy.html
>>
>>
>>
>> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez <ga...@steria.com>
>> wrote:
>>>
>>>
>>> jonathanq wrote:
>>>>
>>>> I am trying to write a process that will use a file endpoint (camel
>>>> 2.1.0)
>>>> to read from a directory.
>>>>
>>>> I need the process to read a file from the directory and then do some
>>>> processing on the contents (namely hitting a REST service for each
>>>> record
>>>> in the file).  We have been asked to limit the number of threads that
>>>> are
>>>> hitting the service to 5.  So we decided to simply process 5 files at a
>>>> time (to avoid concurrency issues reading 1 file and writing to 1 file
>>>> with 5 threads)
>>>>
>>>> I tried a few different approaches, and I wanted to see if there was a
>>>> way
>>>> to do what I want.
>>>>
>>>> Approach 1:
>>>>
>>>> from("file://incoming").to("seda:filequeue")
>>>>
>>>> from("seda:filequeue").thread(5).process()
>>>>
>>>> Now - this reads in ALL of the files in the directory (places camelLock
>>>> on
>>>> all) and then sends them to the seda endpoint.  I saw log messages that
>>>> referred to thread 1 through 6.  But from what I read on the
>>>> documentation, thread() is not necessarily going t limit it at that
>>>> number.
>>>>
>>
>> thread(5) will limit to at most 5 concurrent threads from this point
>> forward.
>>
>>
>>>> Approach 2:
>>>>
>>>> from("file://incoming").thread(5).process()
>>>>
>>>> This only processed 5 files at a time - but created camelLocks on all
>>>> files in the directory.
>>>>
>>>> So then I tried approach 3:
>>>>
>>>> from("file://incoming").to("seda:filequeue")
>>>>
>>>> from("seda:filequeue?concurrentConsumers=5").process()
>>>>
>>>> Again this seems to work, however it puts a camelLock on all the files
>>>> (because they were all processed by the first part of the route, they
>>>> are
>>>> just queued up in the second).
>>>>
>>>>
>>>> While approach 3 works - what I would really like is to not have the
>>>> camelLock placed on the files that are not being processed.
>>>>
>>>> So watching the directory, there would be (at most) 5 files with
>>>> camelLock
>>>> files created at a time, when they finish they are moved to the .camel
>>>> directory, and then it starts processing the next file in the directory.
>>>>
>>
>> You can also implement your own ProcessStrategy where you can deny
>> consuming in more files than 5 at any given time.
>> See the processStrategy option on the file consumer. Just return false
>> on the begin() method.
>>
>> See
>> http://camel.apache.org/file2.html
>> in the bottom of the page.
>>
>>
>>>> Is that possible?  Is there anything I should be sure to do in an error
>>>> route so that I "roll back" the camel locks to ensure that unprocessed
>>>> files are ready to process the next time the application starts?
>>>>
>>>
>>> Hi,
>>>
>>> Maybe you can try to use the parameter maxMessagesPerPoll on the file
>>> endpoint i.e.:
>>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process()
>>>
>>> Check the file component documentation :
>>> http://camel.apache.org/file2.html
>>>
>>> --
>>> View this message in context:
>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html
>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>
>>>
>>
>>
>>
>> --
>> Claus Ibsen
>> Apache Camel Committer
>>
>> Author of Camel in Action: http://www.manning.com/ibsen/
>> Open Source Integration: http://fusesource.com
>> Blog: http://davsclaus.blogspot.com/
>> Twitter: http://twitter.com/davsclaus
>>
>>
>
> --
> View this message in context: http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: Processing 5 files at a time - Threads? SEDA + Concurrent Consumers?

Posted by jonathanq <jq...@abebooks.com>.
I took a good look at the Route Policy - at first the
ThrottlingInflightRoutePolicy class seemed like it could work - as I really
only want 5 exchanges to be in-flight at a time.

Unfortunately it would never suspend the consumer.  I dug deeper into the
code and discovered why.  The ThrottlingInflightRoutePolicy class only
checks the number of inflight exchanges AFTER an exchange has been processed
(the code to stop or start a consumer is all done in the onExchangeDone
method).

Since in my case the exchanges will take a while to process - it wouldn't
know it had exceeded the maximum number until after it had finished
processing one of them.

In my opinion that is a bug - or at the very least an important thing to
note in the documentation.  I spent a fair bit of time trying to figure out
why I could not get it to work as it appeared it was supposed to.  All
because it was not checking the inflight numbers to the threshold until
after it had finished processing an exchange.

I also tried writing my own FileThrottlingRoutePolicy that would test how
many files were in a "inprogress" directory - and stop the consumer if it
exceeded the max concurrent files.

However I ran into read/write issues when I used the preMove of files - for
some reason my processes later would throw exceptions about file not found
or file lock (I can't remember which - i have been trying so many different
things today to try and get this working).

In the end I solved my problem by avoiding my problem :)

The primary reason I didn't want the file locks to occur is it would be a
manual cleanup if we ever had to kill the process while it's running. 
Otherwise the next time it started, it would ignore any of the files that
had a lock file as well.

I re-wrote my route to work as follows:

from("file://incoming?maxMessagesPerPoll=1&idempotent=true&moveFailed=failed&move=processed&readLock=none").threads(5).process()

This way - when files are "finished" they will be placed in a "processed"
directory, when they fail they are put in a "failed" directory.  Anything
still in the incoming directory is to be processed.  Because the memory of
what was processed and what hasn't been was all in memory - restarting the
process will just re-start any of the files still in the incoming directory.

No more Lock files means restarting it won't cause us to have to delete
.lock files.

I wish there was still an easier way to do what I wanted.  Now I just have
to rely on the threads(5) to do the limiting to 5 files at a time.  Although
if I understand your comment (and the documentation) I can't actually rely
on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads
depending on the system load?

Jonathan




Claus Ibsen-2 wrote:
> 
> Hi
> 
> See also route policy to throttle the file consumer to a pace of 5
> concurrent files
> http://camel.apache.org/routepolicy.html
> 
> 
> 
> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez <ga...@steria.com>
> wrote:
>>
>>
>> jonathanq wrote:
>>>
>>> I am trying to write a process that will use a file endpoint (camel
>>> 2.1.0)
>>> to read from a directory.
>>>
>>> I need the process to read a file from the directory and then do some
>>> processing on the contents (namely hitting a REST service for each
>>> record
>>> in the file).  We have been asked to limit the number of threads that
>>> are
>>> hitting the service to 5.  So we decided to simply process 5 files at a
>>> time (to avoid concurrency issues reading 1 file and writing to 1 file
>>> with 5 threads)
>>>
>>> I tried a few different approaches, and I wanted to see if there was a
>>> way
>>> to do what I want.
>>>
>>> Approach 1:
>>>
>>> from("file://incoming").to("seda:filequeue")
>>>
>>> from("seda:filequeue").thread(5).process()
>>>
>>> Now - this reads in ALL of the files in the directory (places camelLock
>>> on
>>> all) and then sends them to the seda endpoint.  I saw log messages that
>>> referred to thread 1 through 6.  But from what I read on the
>>> documentation, thread() is not necessarily going t limit it at that
>>> number.
>>>
> 
> thread(5) will limit to at most 5 concurrent threads from this point
> forward.
> 
> 
>>> Approach 2:
>>>
>>> from("file://incoming").thread(5).process()
>>>
>>> This only processed 5 files at a time - but created camelLocks on all
>>> files in the directory.
>>>
>>> So then I tried approach 3:
>>>
>>> from("file://incoming").to("seda:filequeue")
>>>
>>> from("seda:filequeue?concurrentConsumers=5").process()
>>>
>>> Again this seems to work, however it puts a camelLock on all the files
>>> (because they were all processed by the first part of the route, they
>>> are
>>> just queued up in the second).
>>>
>>>
>>> While approach 3 works - what I would really like is to not have the
>>> camelLock placed on the files that are not being processed.
>>>
>>> So watching the directory, there would be (at most) 5 files with
>>> camelLock
>>> files created at a time, when they finish they are moved to the .camel
>>> directory, and then it starts processing the next file in the directory.
>>>
> 
> You can also implement your own ProcessStrategy where you can deny
> consuming in more files than 5 at any given time.
> See the processStrategy option on the file consumer. Just return false
> on the begin() method.
> 
> See
> http://camel.apache.org/file2.html
> in the bottom of the page.
> 
> 
>>> Is that possible?  Is there anything I should be sure to do in an error
>>> route so that I "roll back" the camel locks to ensure that unprocessed
>>> files are ready to process the next time the application starts?
>>>
>>
>> Hi,
>>
>> Maybe you can try to use the parameter maxMessagesPerPoll on the file
>> endpoint i.e.:
>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process()
>>
>> Check the file component documentation :
>> http://camel.apache.org/file2.html
>>
>> --
>> View this message in context:
>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html
>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>
>>
> 
> 
> 
> -- 
> Claus Ibsen
> Apache Camel Committer
> 
> Author of Camel in Action: http://www.manning.com/ibsen/
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
> 
> 

-- 
View this message in context: http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html
Sent from the Camel - Users mailing list archive at Nabble.com.