You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Claus Ibsen <cl...@gmail.com> on 2010/01/01 15:32:44 UTC

Re: Processing 5 files at a time - Threads? SEDA + Concurrent Consumers?

Hi

On Thu, Dec 31, 2009 at 8:59 PM, jonathanq <jq...@abebooks.com> wrote:
>
> Sorry - I meant - this won't cause me too much trouble since I could just
> change the code type to be File not GenericFile.  Just thought it was
> strange.
>
> I actually changed it to do a .convertBodyTo(String.class) in my route -
> since I was just reading the file and that was why I needed a GenericFile
> object to get the name to read.  Not sure why I didn't think of doing that
> earlier.
>

GenericFile should really be an internal representation only. End
users is encouraged to use regular types such as java.io.File or
String as you did.



> Jonathan
>
> jonathanq wrote:
>>
>> Claus,
>>
>> I figured out why my custom Route Policy wasn't working for me.
>>
>> It seems that there is an issue when I use preMove on the File endpoint -
>> when the Exchange reaches my first Processor, it is no longer a
>> "GenericFile" class.
>>
>> My processor does this on the first line:
>>
>> GenericFile file = exchange.getIn().getBody(GenericFile.class);
>>
>> This works fine with my route - but as soon as I use preMove and put the
>> in progress files into a "processing" directory.  The type of the body is
>> no longer GenericFile, but is now a java.io.File.  So of course, file is
>> now null.
>>

>> This isn't an issue from my code, I can change the type and everything
>> will work.  But it seems very strange that it changes types when "preMove"
>> is set on the endpoint.
>>
>> Is that expected behavior?
>>
>> Jonathan
>>
>>
>> Claus Ibsen-2 wrote:
>>>
>>> On Thu, Dec 31, 2009 at 5:39 PM, jonathanq <jq...@abebooks.com> wrote:
>>>>
>>>> Excellent - that would definitely help my solution, as I could use lock
>>>> files
>>>> and if we had to kill the process, it would just delete those on next
>>>> start
>>>> up and re-process the files.
>>>>
>>>> So when is 2.2.0 coming out? :-)
>>>>
>>> I implemented this feature today so yeah it will be in 2.2
>>>
>>> Early 2010. I hope we get it out in the start of February. The last
>>> major goal is to have an improved thread pool configuration.
>>>
>>>
>>>> The solution I have works - and will probably what we will use for this
>>>> application. But that will help with future applications as we do end up
>>>> writing a lot of file based camel processes.
>>>>
>>>
>>> Yeah file is actually much harder than at first thought. Our goal is
>>> to make the file / ftp components in Camel flexible and to cover many
>>> of the use cases out there. So any feedback is valuable.
>>>
>>> I will git it some though to see if we can change the dynamic inflight
>>> throttler to be measuring metrics a bit earlier.
>>>
>>> Okay I guess its time to celebrate the new year.
>>>
>>>
>>>
>>>> Thanks for all the help!
>>>>
>>>> Jonathan
>>>>
>>>>
>>>> Claus Ibsen-2 wrote:
>>>>>
>>>>> On Thu, Dec 31, 2009 at 12:33 AM, jonathanq <jq...@abebooks.com>
>>>>> wrote:
>>>>>>
>>>>>> I took a good look at the Route Policy - at first the
>>>>>> ThrottlingInflightRoutePolicy class seemed like it could work - as I
>>>>>> really
>>>>>> only want 5 exchanges to be in-flight at a time.
>>>>>>
>>>>>> Unfortunately it would never suspend the consumer.  I dug deeper into
>>>>>> the
>>>>>> code and discovered why.  The ThrottlingInflightRoutePolicy class only
>>>>>> checks the number of inflight exchanges AFTER an exchange has been
>>>>>> processed
>>>>>> (the code to stop or start a consumer is all done in the
>>>>>> onExchangeDone
>>>>>> method).
>>>>>>
>>>>>> Since in my case the exchanges will take a while to process - it
>>>>>> wouldn't
>>>>>> know it had exceeded the maximum number until after it had finished
>>>>>> processing one of them.
>>>>>>
>>>>>> In my opinion that is a bug - or at the very least an important thing
>>>>>> to
>>>>>> note in the documentation.  I spent a fair bit of time trying to
>>>>>> figure
>>>>>> out
>>>>>> why I could not get it to work as it appeared it was supposed to.  All
>>>>>> because it was not checking the inflight numbers to the threshold
>>>>>> until
>>>>>> after it had finished processing an exchange.
>>>>>>
>>>>>> I also tried writing my own FileThrottlingRoutePolicy that would test
>>>>>> how
>>>>>> many files were in a "inprogress" directory - and stop the consumer if
>>>>>> it
>>>>>> exceeded the max concurrent files.
>>>>>>
>>>>>> However I ran into read/write issues when I used the preMove of files
>>>>>> -
>>>>>> for
>>>>>> some reason my processes later would throw exceptions about file not
>>>>>> found
>>>>>> or file lock (I can't remember which - i have been trying so many
>>>>>> different
>>>>>> things today to try and get this working).
>>>>>>
>>>>>> In the end I solved my problem by avoiding my problem :)
>>>>>>
>>>>>> The primary reason I didn't want the file locks to occur is it would
>>>>>> be a
>>>>>> manual cleanup if we ever had to kill the process while it's running.
>>>>>> Otherwise the next time it started, it would ignore any of the files
>>>>>> that
>>>>>> had a lock file as well.
>>>>>>
>>>>>
>>>>> We have a ticket for that
>>>>> https://issues.apache.org/activemq/browse/CAMEL-2082
>>>>>
>>>>>
>>>>>
>>>>>> I re-wrote my route to work as follows:
>>>>>>
>>>>>> from("file://incoming?maxMessagesPerPoll=1&idempotent=true&moveFailed=failed&move=processed&readLock=none").threads(5).process()
>>>>>>
>>>>>
>>>>> Nice solution :)
>>>>>
>>>>>> This way - when files are "finished" they will be placed in a
>>>>>> "processed"
>>>>>> directory, when they fail they are put in a "failed" directory.
>>>>>>  Anything
>>>>>> still in the incoming directory is to be processed.  Because the
>>>>>> memory
>>>>>> of
>>>>>> what was processed and what hasn't been was all in memory - restarting
>>>>>> the
>>>>>> process will just re-start any of the files still in the incoming
>>>>>> directory.
>>>>>>
>>>>>> No more Lock files means restarting it won't cause us to have to
>>>>>> delete
>>>>>> .lock files.
>>>>>>
>>>>>> I wish there was still an easier way to do what I wanted.  Now I just
>>>>>> have
>>>>>> to rely on the threads(5) to do the limiting to 5 files at a time.
>>>>>>  Although
>>>>>> if I understand your comment (and the documentation) I can't actually
>>>>>> rely
>>>>>> on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads
>>>>>> depending on the system load?
>>>>>>
>>>>>> Jonathan
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Claus Ibsen-2 wrote:
>>>>>>>
>>>>>>> Hi
>>>>>>>
>>>>>>> See also route policy to throttle the file consumer to a pace of 5
>>>>>>> concurrent files
>>>>>>> http://camel.apache.org/routepolicy.html
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez
>>>>>>> <ga...@steria.com>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>> jonathanq wrote:
>>>>>>>>>
>>>>>>>>> I am trying to write a process that will use a file endpoint (camel
>>>>>>>>> 2.1.0)
>>>>>>>>> to read from a directory.
>>>>>>>>>
>>>>>>>>> I need the process to read a file from the directory and then do
>>>>>>>>> some
>>>>>>>>> processing on the contents (namely hitting a REST service for each
>>>>>>>>> record
>>>>>>>>> in the file).  We have been asked to limit the number of threads
>>>>>>>>> that
>>>>>>>>> are
>>>>>>>>> hitting the service to 5.  So we decided to simply process 5 files
>>>>>>>>> at
>>>>>>>>> a
>>>>>>>>> time (to avoid concurrency issues reading 1 file and writing to 1
>>>>>>>>> file
>>>>>>>>> with 5 threads)
>>>>>>>>>
>>>>>>>>> I tried a few different approaches, and I wanted to see if there
>>>>>>>>> was a
>>>>>>>>> way
>>>>>>>>> to do what I want.
>>>>>>>>>
>>>>>>>>> Approach 1:
>>>>>>>>>
>>>>>>>>> from("file://incoming").to("seda:filequeue")
>>>>>>>>>
>>>>>>>>> from("seda:filequeue").thread(5).process()
>>>>>>>>>
>>>>>>>>> Now - this reads in ALL of the files in the directory (places
>>>>>>>>> camelLock
>>>>>>>>> on
>>>>>>>>> all) and then sends them to the seda endpoint.  I saw log messages
>>>>>>>>> that
>>>>>>>>> referred to thread 1 through 6.  But from what I read on the
>>>>>>>>> documentation, thread() is not necessarily going t limit it at that
>>>>>>>>> number.
>>>>>>>>>
>>>>>>>
>>>>>>> thread(5) will limit to at most 5 concurrent threads from this point
>>>>>>> forward.
>>>>>>>
>>>>>>>
>>>>>>>>> Approach 2:
>>>>>>>>>
>>>>>>>>> from("file://incoming").thread(5).process()
>>>>>>>>>
>>>>>>>>> This only processed 5 files at a time - but created camelLocks on
>>>>>>>>> all
>>>>>>>>> files in the directory.
>>>>>>>>>
>>>>>>>>> So then I tried approach 3:
>>>>>>>>>
>>>>>>>>> from("file://incoming").to("seda:filequeue")
>>>>>>>>>
>>>>>>>>> from("seda:filequeue?concurrentConsumers=5").process()
>>>>>>>>>
>>>>>>>>> Again this seems to work, however it puts a camelLock on all the
>>>>>>>>> files
>>>>>>>>> (because they were all processed by the first part of the route,
>>>>>>>>> they
>>>>>>>>> are
>>>>>>>>> just queued up in the second).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> While approach 3 works - what I would really like is to not have
>>>>>>>>> the
>>>>>>>>> camelLock placed on the files that are not being processed.
>>>>>>>>>
>>>>>>>>> So watching the directory, there would be (at most) 5 files with
>>>>>>>>> camelLock
>>>>>>>>> files created at a time, when they finish they are moved to the
>>>>>>>>> .camel
>>>>>>>>> directory, and then it starts processing the next file in the
>>>>>>>>> directory.
>>>>>>>>>
>>>>>>>
>>>>>>> You can also implement your own ProcessStrategy where you can deny
>>>>>>> consuming in more files than 5 at any given time.
>>>>>>> See the processStrategy option on the file consumer. Just return
>>>>>>> false
>>>>>>> on the begin() method.
>>>>>>>
>>>>>>> See
>>>>>>> http://camel.apache.org/file2.html
>>>>>>> in the bottom of the page.
>>>>>>>
>>>>>>>
>>>>>>>>> Is that possible?  Is there anything I should be sure to do in an
>>>>>>>>> error
>>>>>>>>> route so that I "roll back" the camel locks to ensure that
>>>>>>>>> unprocessed
>>>>>>>>> files are ready to process the next time the application starts?
>>>>>>>>>
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Maybe you can try to use the parameter maxMessagesPerPoll on the
>>>>>>>> file
>>>>>>>> endpoint i.e.:
>>>>>>>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process()
>>>>>>>>
>>>>>>>> Check the file component documentation :
>>>>>>>> http://camel.apache.org/file2.html
>>>>>>>>
>>>>>>>> --
>>>>>>>> View this message in context:
>>>>>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html
>>>>>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Claus Ibsen
>>>>>>> Apache Camel Committer
>>>>>>>
>>>>>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>>>>>> Open Source Integration: http://fusesource.com
>>>>>>> Blog: http://davsclaus.blogspot.com/
>>>>>>> Twitter: http://twitter.com/davsclaus
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html
>>>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Claus Ibsen
>>>>> Apache Camel Committer
>>>>>
>>>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>>>> Open Source Integration: http://fusesource.com
>>>>> Blog: http://davsclaus.blogspot.com/
>>>>> Twitter: http://twitter.com/davsclaus
>>>>>
>>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26980045.html
>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Claus Ibsen
>>> Apache Camel Committer
>>>
>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>> Open Source Integration: http://fusesource.com
>>> Blog: http://davsclaus.blogspot.com/
>>> Twitter: http://twitter.com/davsclaus
>>>
>>>
>>
>>
>
> --
> View this message in context: http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26981818.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus