You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Ralf Steppacher <ra...@derivativepartners.com> on 2012/08/30 17:24:35 UTC

File and async/ woes

Hallo all,

I have several issues defining an async route in Spring XML.
Can someone shed some light on what the expected behavior is and what
would be a bug and/or a misunderstanding on my side?

The stripped down route:

<endpoint id="fileBufferFrom"
uri="file://${my.dir}/?delete=true&amp;idempotent=true&amp;preMove=inprogress" />

<camel:route id="processFromFileBuffer">
  <camel:from ref="fileBufferFrom" />
  <camel:threads poolSize="5" maxPoolSize="5" maxQueueSize="1"
threadName="file consumer" rejectedPolicy="Abort">
    <camel:convertBodyTo type="java.io.InputStream" />
    <camel:split streaming="true" parallelProcessing="true">
        <camel:tokenize token="\r\n" />
        <camel:to ref="mq.csv" />
    </camel:split>
  </camel:threads>
</camel:route>


Camel 2.9.2
If maxQueueSize > 0 then maxPoolSize + maxQueueSize files are moved from
the in-folder to the inprogress-folder. If there are more files they are
moved to the inprogress-folder as soon as the caller thread is free to
move them and worker threads are available. 
Setting rejectedPolicy="Abort" or callerRunsWhenRejected="false" seems
to get ignored and the reject policy "CallerRuns" is applied. The source
is a file endpoint, so I assume the thread with the scanned directory as
its name is the caller thread? I can see that thread come to live in
VisualVM.

If maxQueueSize = 0 then all files present in the input directory are
always moved to the inprogress-folder AND the rejectedPolicy="Abort" is
honored! There are always maxPoolSize files processed in parallel.


Camel 2.10.0
Independent of the value of maxQueueSize, if rejectPolicy="Abort" then
the policy is honored, sort of. All files present in the input directory
are always moved to the inprogress-folder. Only maxPoolSiz +
maxQueueSize files are being processed. All others stay untouched in the
"inprogress" folder!

Using <camel:threadPool> and referencing that in <camel:threads> does
not change the above behavior for 2.9.2 or 2.10.0.


The behavior I expected was that with rejectPolicy="Abort" always
maxPoolSize + maxQueueSize are moved from the in-folder to the
inprogress-folder and processed from there. As processing of one file
completes it gets deleted and a file from the in-folder is moved to the
inprogress-folder on the next poll of the in-folder.


Thanks!
Ralf

Re: File and async/ woes

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Thanks for sharing your findings.

On Wed, Sep 26, 2012 at 7:47 PM, Ralf Steppacher
<ra...@derivativepartners.com> wrote:
> Well, turns out that the only reliable way of preventing the file
> consumer thread from being assigned tasks is to set the queue depth of
> the executor pool to "unlimited" (maxQueueSize="0") and use the
> throttling route policy and sufficiently low maxMessagesPerPoll at the
> file endpoint to manage the queue depth.
>
> Ralf
>
>
>
> -----Original Message-----
> From: Ralf Steppacher <ra...@derivativepartners.com>
> Reply-to: users@camel.apache.org
> To: users@camel.apache.org
> Subject: Re: File and async/<camel:threads> woes
> Date: Tue, 25 Sep 2012 19:27:17 +0200
>
> Claus,
>
> I finally implemented your suggestion with the route policy and was able
> to work around the problem of the caller thread getting blocked by a
> large task. Indeed the throttling affects the number of files being
> processed, not the number of exchanges created by the splitter.
> In my case, if the number of threads in the pool is about double the
> suspend threshold I can prevent the caller thread from being blocked.
> However, if the pattern of file sizes changes, this might not be true
> anymore.
>
> I also realized that I have to set useFixedDelay=false on the file
> endpoint to allow for continuous feeding of files for async processing.
> The default of useFixedDelay=true also leads to large files blocking
> processing of pending files (by way of preventing the file endpoint from
> polling).
> I double checked that this was not my root problem all along; but it is
> not. Without the throttling route policy and with useFixedDelay=false
> processing still gets stuck on large files.
>
> All the above I have tested with Camel 2.9.2. Due to time constraints I
> have not yet given 2.10.x another shot with this.
>
>
> Ralf
>
>
> -----Original Message-----
> From: Claus Ibsen <cl...@gmail.com>
> Reply-to: users@camel.apache.org
> To: users@camel.apache.org
> Subject: Re: File and async/<camel:threads> woes
> Date: Sun, 9 Sep 2012 10:33:45 +0200
>
> Hi
>
> You can use route policy to control the route consumer to
> suspend/resume, depending on number of in flight exchanges for the
> route.
> http://camel.apache.org/routepolicy
>
> Then you can suspend when reaching 5, and resume when back to 4 or lower.
> There is a ThrottlingInflightRoutePolicy for that. Though its
> watermark is percent based. Also consider setting maxMessagesPerPoll=1
> on the file consumer endpoint, so it only grabs 1 filer at a time.
>
> About the thread pools. Take a closer look at the difference between
> Abort and Reject. You would need to use the one  that ensures a
> rollback, when the thread pool is full. So the preMove file gets
> rolled back (assuming that logic works).
>
>
>
>
> On Fri, Sep 7, 2012 at 3:54 PM, Ralf Steppacher
> <ra...@derivativepartners.com> wrote:
>> Hello Claus,
>>
>> If I want 5 files to be processed in parallel line by line, every line
>> taken care of asynchronously, don't I need the
>> <threads ...><camel:split ...></camel:split></threads> construct?
>> If threads and split refer to the same thread pool, then of course I
>> would not have to worry about finding the sweet spot in terms of number
>> of threads in the two pools; to keep all threads in split-pool as busy
>> as possible and have no thread in the threads-pool waiting. I prefer to
>> keep them apart because that way I have more control about the number
>> and order (small files first) in which they are processed.
>>
>> Using one thread pool would not solve my problem of the rejectedPolicy
>> not being honored, would it? This really kills me. I am processing files
>> ranging from 1kb to 200mb, sorted by their size. If the caller thread
>> ends up with the 200mb file, all other processing stalls until the big
>> file has been processed. I need to prevent processing of small files
>> being blocked by large files.
>>
>> Is the file consumer supposed to preMove all files from the input
>> directory to the inprogress directory (2.10.0 behavior) or just the ones
>> it is actually going to process, i.e. number of threads (2.9.2
>> behavior)?
>>
>> With rejectedPolicy="Abort":
>> If the 2.10.0 preMove behavior is the expected behavior, is it expected
>> that all files that could not be processed on the first poll of the file
>> endpoint (file number > number of threads) stay in the inprogress
>> directory for ever, never being processed (2.10.0 behavior) or should
>> they be picked up as soon as a thread becomes free (2.9.2 behavior with
>> respective preMove behavior) and the file endpoint polls again?
>>
>>
>> Thanks!
>> Ralf
>>
>>
>> -----Original Message-----
>> From: Claus Ibsen <cl...@gmail.com>
>> Reply-to: users@camel.apache.org
>> To: users@camel.apache.org
>> Subject: Re: File and async/<camel:threads> woes
>> Date: Thu, 6 Sep 2012 11:30:29 +0200
>>
>> Hi
>>
>> Why do it so complicated with 2 thread pools. The splitter can just
>> refer to a custom thread pool profile / thread pool which you can
>> customize as you want.
>>
>> Also the fine consumer with preMove will move the file asap, it starts routing.
>>
>>
>> On Thu, Aug 30, 2012 at 5:24 PM, Ralf Steppacher
>> <ra...@derivativepartners.com> wrote:
>>> Hallo all,
>>>
>>> I have several issues defining an async route in Spring XML.
>>> Can someone shed some light on what the expected behavior is and what
>>> would be a bug and/or a misunderstanding on my side?
>>>
>>> The stripped down route:
>>>
>>> <endpoint id="fileBufferFrom"
>>> uri="file://${my.dir}/?delete=true&amp;idempotent=true&amp;preMove=inprogress" />
>>>
>>> <camel:route id="processFromFileBuffer">
>>>   <camel:from ref="fileBufferFrom" />
>>>   <camel:threads poolSize="5" maxPoolSize="5" maxQueueSize="1"
>>> threadName="file consumer" rejectedPolicy="Abort">
>>>     <camel:convertBodyTo type="java.io.InputStream" />
>>>     <camel:split streaming="true" parallelProcessing="true">
>>>         <camel:tokenize token="\r\n" />
>>>         <camel:to ref="mq.csv" />
>>>     </camel:split>
>>>   </camel:threads>
>>> </camel:route>
>>>
>>>
>>> Camel 2.9.2
>>> If maxQueueSize > 0 then maxPoolSize + maxQueueSize files are moved from
>>> the in-folder to the inprogress-folder. If there are more files they are
>>> moved to the inprogress-folder as soon as the caller thread is free to
>>> move them and worker threads are available.
>>> Setting rejectedPolicy="Abort" or callerRunsWhenRejected="false" seems
>>> to get ignored and the reject policy "CallerRuns" is applied. The source
>>> is a file endpoint, so I assume the thread with the scanned directory as
>>> its name is the caller thread? I can see that thread come to live in
>>> VisualVM.
>>>
>>> If maxQueueSize = 0 then all files present in the input directory are
>>> always moved to the inprogress-folder AND the rejectedPolicy="Abort" is
>>> honored! There are always maxPoolSize files processed in parallel.
>>>
>>>
>>> Camel 2.10.0
>>> Independent of the value of maxQueueSize, if rejectPolicy="Abort" then
>>> the policy is honored, sort of. All files present in the input directory
>>> are always moved to the inprogress-folder. Only maxPoolSiz +
>>> maxQueueSize files are being processed. All others stay untouched in the
>>> "inprogress" folder!
>>>
>>> Using <camel:threadPool> and referencing that in <camel:threads> does
>>> not change the above behavior for 2.9.2 or 2.10.0.
>>>
>>>
>>> The behavior I expected was that with rejectPolicy="Abort" always
>>> maxPoolSize + maxQueueSize are moved from the in-folder to the
>>> inprogress-folder and processed from there. As processing of one file
>>> completes it gets deleted and a file from the in-folder is moved to the
>>> inprogress-folder on the next poll of the in-folder.
>>>
>>>
>>> Thanks!
>>> Ralf
>>
>>
>>
>
>
>



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
FuseSource is now part of Red Hat
Email: cibsen@redhat.com
Web: http://fusesource.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: File and async/ woes

Posted by Ralf Steppacher <ra...@derivativepartners.com>.
Well, turns out that the only reliable way of preventing the file
consumer thread from being assigned tasks is to set the queue depth of
the executor pool to "unlimited" (maxQueueSize="0") and use the
throttling route policy and sufficiently low maxMessagesPerPoll at the
file endpoint to manage the queue depth.

Ralf



-----Original Message-----
From: Ralf Steppacher <ra...@derivativepartners.com>
Reply-to: users@camel.apache.org
To: users@camel.apache.org
Subject: Re: File and async/<camel:threads> woes
Date: Tue, 25 Sep 2012 19:27:17 +0200

Claus,

I finally implemented your suggestion with the route policy and was able
to work around the problem of the caller thread getting blocked by a
large task. Indeed the throttling affects the number of files being
processed, not the number of exchanges created by the splitter.
In my case, if the number of threads in the pool is about double the
suspend threshold I can prevent the caller thread from being blocked.
However, if the pattern of file sizes changes, this might not be true
anymore.

I also realized that I have to set useFixedDelay=false on the file
endpoint to allow for continuous feeding of files for async processing.
The default of useFixedDelay=true also leads to large files blocking
processing of pending files (by way of preventing the file endpoint from
polling).
I double checked that this was not my root problem all along; but it is
not. Without the throttling route policy and with useFixedDelay=false
processing still gets stuck on large files.

All the above I have tested with Camel 2.9.2. Due to time constraints I
have not yet given 2.10.x another shot with this.


Ralf


-----Original Message-----
From: Claus Ibsen <cl...@gmail.com>
Reply-to: users@camel.apache.org
To: users@camel.apache.org
Subject: Re: File and async/<camel:threads> woes
Date: Sun, 9 Sep 2012 10:33:45 +0200

Hi

You can use route policy to control the route consumer to
suspend/resume, depending on number of in flight exchanges for the
route.
http://camel.apache.org/routepolicy

Then you can suspend when reaching 5, and resume when back to 4 or lower.
There is a ThrottlingInflightRoutePolicy for that. Though its
watermark is percent based. Also consider setting maxMessagesPerPoll=1
on the file consumer endpoint, so it only grabs 1 filer at a time.

About the thread pools. Take a closer look at the difference between
Abort and Reject. You would need to use the one  that ensures a
rollback, when the thread pool is full. So the preMove file gets
rolled back (assuming that logic works).




On Fri, Sep 7, 2012 at 3:54 PM, Ralf Steppacher
<ra...@derivativepartners.com> wrote:
> Hello Claus,
>
> If I want 5 files to be processed in parallel line by line, every line
> taken care of asynchronously, don't I need the
> <threads ...><camel:split ...></camel:split></threads> construct?
> If threads and split refer to the same thread pool, then of course I
> would not have to worry about finding the sweet spot in terms of number
> of threads in the two pools; to keep all threads in split-pool as busy
> as possible and have no thread in the threads-pool waiting. I prefer to
> keep them apart because that way I have more control about the number
> and order (small files first) in which they are processed.
>
> Using one thread pool would not solve my problem of the rejectedPolicy
> not being honored, would it? This really kills me. I am processing files
> ranging from 1kb to 200mb, sorted by their size. If the caller thread
> ends up with the 200mb file, all other processing stalls until the big
> file has been processed. I need to prevent processing of small files
> being blocked by large files.
>
> Is the file consumer supposed to preMove all files from the input
> directory to the inprogress directory (2.10.0 behavior) or just the ones
> it is actually going to process, i.e. number of threads (2.9.2
> behavior)?
>
> With rejectedPolicy="Abort":
> If the 2.10.0 preMove behavior is the expected behavior, is it expected
> that all files that could not be processed on the first poll of the file
> endpoint (file number > number of threads) stay in the inprogress
> directory for ever, never being processed (2.10.0 behavior) or should
> they be picked up as soon as a thread becomes free (2.9.2 behavior with
> respective preMove behavior) and the file endpoint polls again?
>
>
> Thanks!
> Ralf
>
>
> -----Original Message-----
> From: Claus Ibsen <cl...@gmail.com>
> Reply-to: users@camel.apache.org
> To: users@camel.apache.org
> Subject: Re: File and async/<camel:threads> woes
> Date: Thu, 6 Sep 2012 11:30:29 +0200
>
> Hi
>
> Why do it so complicated with 2 thread pools. The splitter can just
> refer to a custom thread pool profile / thread pool which you can
> customize as you want.
>
> Also the fine consumer with preMove will move the file asap, it starts routing.
>
>
> On Thu, Aug 30, 2012 at 5:24 PM, Ralf Steppacher
> <ra...@derivativepartners.com> wrote:
>> Hallo all,
>>
>> I have several issues defining an async route in Spring XML.
>> Can someone shed some light on what the expected behavior is and what
>> would be a bug and/or a misunderstanding on my side?
>>
>> The stripped down route:
>>
>> <endpoint id="fileBufferFrom"
>> uri="file://${my.dir}/?delete=true&amp;idempotent=true&amp;preMove=inprogress" />
>>
>> <camel:route id="processFromFileBuffer">
>>   <camel:from ref="fileBufferFrom" />
>>   <camel:threads poolSize="5" maxPoolSize="5" maxQueueSize="1"
>> threadName="file consumer" rejectedPolicy="Abort">
>>     <camel:convertBodyTo type="java.io.InputStream" />
>>     <camel:split streaming="true" parallelProcessing="true">
>>         <camel:tokenize token="\r\n" />
>>         <camel:to ref="mq.csv" />
>>     </camel:split>
>>   </camel:threads>
>> </camel:route>
>>
>>
>> Camel 2.9.2
>> If maxQueueSize > 0 then maxPoolSize + maxQueueSize files are moved from
>> the in-folder to the inprogress-folder. If there are more files they are
>> moved to the inprogress-folder as soon as the caller thread is free to
>> move them and worker threads are available.
>> Setting rejectedPolicy="Abort" or callerRunsWhenRejected="false" seems
>> to get ignored and the reject policy "CallerRuns" is applied. The source
>> is a file endpoint, so I assume the thread with the scanned directory as
>> its name is the caller thread? I can see that thread come to live in
>> VisualVM.
>>
>> If maxQueueSize = 0 then all files present in the input directory are
>> always moved to the inprogress-folder AND the rejectedPolicy="Abort" is
>> honored! There are always maxPoolSize files processed in parallel.
>>
>>
>> Camel 2.10.0
>> Independent of the value of maxQueueSize, if rejectPolicy="Abort" then
>> the policy is honored, sort of. All files present in the input directory
>> are always moved to the inprogress-folder. Only maxPoolSiz +
>> maxQueueSize files are being processed. All others stay untouched in the
>> "inprogress" folder!
>>
>> Using <camel:threadPool> and referencing that in <camel:threads> does
>> not change the above behavior for 2.9.2 or 2.10.0.
>>
>>
>> The behavior I expected was that with rejectPolicy="Abort" always
>> maxPoolSize + maxQueueSize are moved from the in-folder to the
>> inprogress-folder and processed from there. As processing of one file
>> completes it gets deleted and a file from the in-folder is moved to the
>> inprogress-folder on the next poll of the in-folder.
>>
>>
>> Thanks!
>> Ralf
>
>
>




Re: File and async/ woes

Posted by Ralf Steppacher <ra...@derivativepartners.com>.
Claus,

I finally implemented your suggestion with the route policy and was able
to work around the problem of the caller thread getting blocked by a
large task. Indeed the throttling affects the number of files being
processed, not the number of exchanges created by the splitter.
In my case, if the number of threads in the pool is about double the
suspend threshold I can prevent the caller thread from being blocked.
However, if the pattern of file sizes changes, this might not be true
anymore.

I also realized that I have to set useFixedDelay=false on the file
endpoint to allow for continuous feeding of files for async processing.
The default of useFixedDelay=true also leads to large files blocking
processing of pending files (by way of preventing the file endpoint from
polling).
I double checked that this was not my root problem all along; but it is
not. Without the throttling route policy and with useFixedDelay=false
processing still gets stuck on large files.

All the above I have tested with Camel 2.9.2. Due to time constraints I
have not yet given 2.10.x another shot with this.


Ralf


-----Original Message-----
From: Claus Ibsen <cl...@gmail.com>
Reply-to: users@camel.apache.org
To: users@camel.apache.org
Subject: Re: File and async/<camel:threads> woes
Date: Sun, 9 Sep 2012 10:33:45 +0200

Hi

You can use route policy to control the route consumer to
suspend/resume, depending on number of in flight exchanges for the
route.
http://camel.apache.org/routepolicy

Then you can suspend when reaching 5, and resume when back to 4 or lower.
There is a ThrottlingInflightRoutePolicy for that. Though its
watermark is percent based. Also consider setting maxMessagesPerPoll=1
on the file consumer endpoint, so it only grabs 1 filer at a time.

About the thread pools. Take a closer look at the difference between
Abort and Reject. You would need to use the one  that ensures a
rollback, when the thread pool is full. So the preMove file gets
rolled back (assuming that logic works).




On Fri, Sep 7, 2012 at 3:54 PM, Ralf Steppacher
<ra...@derivativepartners.com> wrote:
> Hello Claus,
>
> If I want 5 files to be processed in parallel line by line, every line
> taken care of asynchronously, don't I need the
> <threads ...><camel:split ...></camel:split></threads> construct?
> If threads and split refer to the same thread pool, then of course I
> would not have to worry about finding the sweet spot in terms of number
> of threads in the two pools; to keep all threads in split-pool as busy
> as possible and have no thread in the threads-pool waiting. I prefer to
> keep them apart because that way I have more control about the number
> and order (small files first) in which they are processed.
>
> Using one thread pool would not solve my problem of the rejectedPolicy
> not being honored, would it? This really kills me. I am processing files
> ranging from 1kb to 200mb, sorted by their size. If the caller thread
> ends up with the 200mb file, all other processing stalls until the big
> file has been processed. I need to prevent processing of small files
> being blocked by large files.
>
> Is the file consumer supposed to preMove all files from the input
> directory to the inprogress directory (2.10.0 behavior) or just the ones
> it is actually going to process, i.e. number of threads (2.9.2
> behavior)?
>
> With rejectedPolicy="Abort":
> If the 2.10.0 preMove behavior is the expected behavior, is it expected
> that all files that could not be processed on the first poll of the file
> endpoint (file number > number of threads) stay in the inprogress
> directory for ever, never being processed (2.10.0 behavior) or should
> they be picked up as soon as a thread becomes free (2.9.2 behavior with
> respective preMove behavior) and the file endpoint polls again?
>
>
> Thanks!
> Ralf
>
>
> -----Original Message-----
> From: Claus Ibsen <cl...@gmail.com>
> Reply-to: users@camel.apache.org
> To: users@camel.apache.org
> Subject: Re: File and async/<camel:threads> woes
> Date: Thu, 6 Sep 2012 11:30:29 +0200
>
> Hi
>
> Why do it so complicated with 2 thread pools. The splitter can just
> refer to a custom thread pool profile / thread pool which you can
> customize as you want.
>
> Also the fine consumer with preMove will move the file asap, it starts routing.
>
>
> On Thu, Aug 30, 2012 at 5:24 PM, Ralf Steppacher
> <ra...@derivativepartners.com> wrote:
>> Hallo all,
>>
>> I have several issues defining an async route in Spring XML.
>> Can someone shed some light on what the expected behavior is and what
>> would be a bug and/or a misunderstanding on my side?
>>
>> The stripped down route:
>>
>> <endpoint id="fileBufferFrom"
>> uri="file://${my.dir}/?delete=true&amp;idempotent=true&amp;preMove=inprogress" />
>>
>> <camel:route id="processFromFileBuffer">
>>   <camel:from ref="fileBufferFrom" />
>>   <camel:threads poolSize="5" maxPoolSize="5" maxQueueSize="1"
>> threadName="file consumer" rejectedPolicy="Abort">
>>     <camel:convertBodyTo type="java.io.InputStream" />
>>     <camel:split streaming="true" parallelProcessing="true">
>>         <camel:tokenize token="\r\n" />
>>         <camel:to ref="mq.csv" />
>>     </camel:split>
>>   </camel:threads>
>> </camel:route>
>>
>>
>> Camel 2.9.2
>> If maxQueueSize > 0 then maxPoolSize + maxQueueSize files are moved from
>> the in-folder to the inprogress-folder. If there are more files they are
>> moved to the inprogress-folder as soon as the caller thread is free to
>> move them and worker threads are available.
>> Setting rejectedPolicy="Abort" or callerRunsWhenRejected="false" seems
>> to get ignored and the reject policy "CallerRuns" is applied. The source
>> is a file endpoint, so I assume the thread with the scanned directory as
>> its name is the caller thread? I can see that thread come to live in
>> VisualVM.
>>
>> If maxQueueSize = 0 then all files present in the input directory are
>> always moved to the inprogress-folder AND the rejectedPolicy="Abort" is
>> honored! There are always maxPoolSize files processed in parallel.
>>
>>
>> Camel 2.10.0
>> Independent of the value of maxQueueSize, if rejectPolicy="Abort" then
>> the policy is honored, sort of. All files present in the input directory
>> are always moved to the inprogress-folder. Only maxPoolSiz +
>> maxQueueSize files are being processed. All others stay untouched in the
>> "inprogress" folder!
>>
>> Using <camel:threadPool> and referencing that in <camel:threads> does
>> not change the above behavior for 2.9.2 or 2.10.0.
>>
>>
>> The behavior I expected was that with rejectPolicy="Abort" always
>> maxPoolSize + maxQueueSize are moved from the in-folder to the
>> inprogress-folder and processed from there. As processing of one file
>> completes it gets deleted and a file from the in-folder is moved to the
>> inprogress-folder on the next poll of the in-folder.
>>
>>
>> Thanks!
>> Ralf
>
>
>




Re: File and async/ woes

Posted by Ralf Steppacher <ra...@derivativepartners.com>.
Claus,

the route policy applies to the route as a whole, does it not? Will
setting a limit of 5 not primarily limit the splitter? I will check it
out... (not within the next two weeks though, unfortunately).

I did not look beyond the policy "Abort" because the online
documentation sais only Abort and CallerRuns work properly in 2.9 and
below. By their name the other two policies (Discard, DiscardOldest) did
not appear suitable to me because they discard exchanges/files. However,
as you said, I need "reject and retry" behavior.
"Abort" did what I need in 2.9 (apart from the problem with the caller
thread being used), but not any more in 2.10.


Thanks again!
Ralf


-----Original Message-----
From: Claus Ibsen <cl...@gmail.com>
Reply-to: users@camel.apache.org
To: users@camel.apache.org
Subject: Re: File and async/<camel:threads> woes
Date: Sun, 9 Sep 2012 10:33:45 +0200

Hi

You can use route policy to control the route consumer to
suspend/resume, depending on number of in flight exchanges for the
route.
http://camel.apache.org/routepolicy

Then you can suspend when reaching 5, and resume when back to 4 or lower.
There is a ThrottlingInflightRoutePolicy for that. Though its
watermark is percent based. Also consider setting maxMessagesPerPoll=1
on the file consumer endpoint, so it only grabs 1 filer at a time.

About the thread pools. Take a closer look at the difference between
Abort and Reject. You would need to use the one  that ensures a
rollback, when the thread pool is full. So the preMove file gets
rolled back (assuming that logic works).




On Fri, Sep 7, 2012 at 3:54 PM, Ralf Steppacher
<ra...@derivativepartners.com> wrote:
> Hello Claus,
>
> If I want 5 files to be processed in parallel line by line, every line
> taken care of asynchronously, don't I need the
> <threads ...><camel:split ...></camel:split></threads> construct?
> If threads and split refer to the same thread pool, then of course I
> would not have to worry about finding the sweet spot in terms of number
> of threads in the two pools; to keep all threads in split-pool as busy
> as possible and have no thread in the threads-pool waiting. I prefer to
> keep them apart because that way I have more control about the number
> and order (small files first) in which they are processed.
>
> Using one thread pool would not solve my problem of the rejectedPolicy
> not being honored, would it? This really kills me. I am processing files
> ranging from 1kb to 200mb, sorted by their size. If the caller thread
> ends up with the 200mb file, all other processing stalls until the big
> file has been processed. I need to prevent processing of small files
> being blocked by large files.
>
> Is the file consumer supposed to preMove all files from the input
> directory to the inprogress directory (2.10.0 behavior) or just the ones
> it is actually going to process, i.e. number of threads (2.9.2
> behavior)?
>
> With rejectedPolicy="Abort":
> If the 2.10.0 preMove behavior is the expected behavior, is it expected
> that all files that could not be processed on the first poll of the file
> endpoint (file number > number of threads) stay in the inprogress
> directory for ever, never being processed (2.10.0 behavior) or should
> they be picked up as soon as a thread becomes free (2.9.2 behavior with
> respective preMove behavior) and the file endpoint polls again?
>
>
> Thanks!
> Ralf
>
>
> -----Original Message-----
> From: Claus Ibsen <cl...@gmail.com>
> Reply-to: users@camel.apache.org
> To: users@camel.apache.org
> Subject: Re: File and async/<camel:threads> woes
> Date: Thu, 6 Sep 2012 11:30:29 +0200
>
> Hi
>
> Why do it so complicated with 2 thread pools. The splitter can just
> refer to a custom thread pool profile / thread pool which you can
> customize as you want.
>
> Also the fine consumer with preMove will move the file asap, it starts routing.
>
>
> On Thu, Aug 30, 2012 at 5:24 PM, Ralf Steppacher
> <ra...@derivativepartners.com> wrote:
>> Hallo all,
>>
>> I have several issues defining an async route in Spring XML.
>> Can someone shed some light on what the expected behavior is and what
>> would be a bug and/or a misunderstanding on my side?
>>
>> The stripped down route:
>>
>> <endpoint id="fileBufferFrom"
>> uri="file://${my.dir}/?delete=true&amp;idempotent=true&amp;preMove=inprogress" />
>>
>> <camel:route id="processFromFileBuffer">
>>   <camel:from ref="fileBufferFrom" />
>>   <camel:threads poolSize="5" maxPoolSize="5" maxQueueSize="1"
>> threadName="file consumer" rejectedPolicy="Abort">
>>     <camel:convertBodyTo type="java.io.InputStream" />
>>     <camel:split streaming="true" parallelProcessing="true">
>>         <camel:tokenize token="\r\n" />
>>         <camel:to ref="mq.csv" />
>>     </camel:split>
>>   </camel:threads>
>> </camel:route>
>>
>>
>> Camel 2.9.2
>> If maxQueueSize > 0 then maxPoolSize + maxQueueSize files are moved from
>> the in-folder to the inprogress-folder. If there are more files they are
>> moved to the inprogress-folder as soon as the caller thread is free to
>> move them and worker threads are available.
>> Setting rejectedPolicy="Abort" or callerRunsWhenRejected="false" seems
>> to get ignored and the reject policy "CallerRuns" is applied. The source
>> is a file endpoint, so I assume the thread with the scanned directory as
>> its name is the caller thread? I can see that thread come to live in
>> VisualVM.
>>
>> If maxQueueSize = 0 then all files present in the input directory are
>> always moved to the inprogress-folder AND the rejectedPolicy="Abort" is
>> honored! There are always maxPoolSize files processed in parallel.
>>
>>
>> Camel 2.10.0
>> Independent of the value of maxQueueSize, if rejectPolicy="Abort" then
>> the policy is honored, sort of. All files present in the input directory
>> are always moved to the inprogress-folder. Only maxPoolSiz +
>> maxQueueSize files are being processed. All others stay untouched in the
>> "inprogress" folder!
>>
>> Using <camel:threadPool> and referencing that in <camel:threads> does
>> not change the above behavior for 2.9.2 or 2.10.0.
>>
>>
>> The behavior I expected was that with rejectPolicy="Abort" always
>> maxPoolSize + maxQueueSize are moved from the in-folder to the
>> inprogress-folder and processed from there. As processing of one file
>> completes it gets deleted and a file from the in-folder is moved to the
>> inprogress-folder on the next poll of the in-folder.
>>
>>
>> Thanks!
>> Ralf
>
>
>




Re: File and async/ woes

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

You can use route policy to control the route consumer to
suspend/resume, depending on number of in flight exchanges for the
route.
http://camel.apache.org/routepolicy

Then you can suspend when reaching 5, and resume when back to 4 or lower.
There is a ThrottlingInflightRoutePolicy for that. Though its
watermark is percent based. Also consider setting maxMessagesPerPoll=1
on the file consumer endpoint, so it only grabs 1 filer at a time.

About the thread pools. Take a closer look at the difference between
Abort and Reject. You would need to use the one  that ensures a
rollback, when the thread pool is full. So the preMove file gets
rolled back (assuming that logic works).




On Fri, Sep 7, 2012 at 3:54 PM, Ralf Steppacher
<ra...@derivativepartners.com> wrote:
> Hello Claus,
>
> If I want 5 files to be processed in parallel line by line, every line
> taken care of asynchronously, don't I need the
> <threads ...><camel:split ...></camel:split></threads> construct?
> If threads and split refer to the same thread pool, then of course I
> would not have to worry about finding the sweet spot in terms of number
> of threads in the two pools; to keep all threads in split-pool as busy
> as possible and have no thread in the threads-pool waiting. I prefer to
> keep them apart because that way I have more control about the number
> and order (small files first) in which they are processed.
>
> Using one thread pool would not solve my problem of the rejectedPolicy
> not being honored, would it? This really kills me. I am processing files
> ranging from 1kb to 200mb, sorted by their size. If the caller thread
> ends up with the 200mb file, all other processing stalls until the big
> file has been processed. I need to prevent processing of small files
> being blocked by large files.
>
> Is the file consumer supposed to preMove all files from the input
> directory to the inprogress directory (2.10.0 behavior) or just the ones
> it is actually going to process, i.e. number of threads (2.9.2
> behavior)?
>
> With rejectedPolicy="Abort":
> If the 2.10.0 preMove behavior is the expected behavior, is it expected
> that all files that could not be processed on the first poll of the file
> endpoint (file number > number of threads) stay in the inprogress
> directory for ever, never being processed (2.10.0 behavior) or should
> they be picked up as soon as a thread becomes free (2.9.2 behavior with
> respective preMove behavior) and the file endpoint polls again?
>
>
> Thanks!
> Ralf
>
>
> -----Original Message-----
> From: Claus Ibsen <cl...@gmail.com>
> Reply-to: users@camel.apache.org
> To: users@camel.apache.org
> Subject: Re: File and async/<camel:threads> woes
> Date: Thu, 6 Sep 2012 11:30:29 +0200
>
> Hi
>
> Why do it so complicated with 2 thread pools. The splitter can just
> refer to a custom thread pool profile / thread pool which you can
> customize as you want.
>
> Also the fine consumer with preMove will move the file asap, it starts routing.
>
>
> On Thu, Aug 30, 2012 at 5:24 PM, Ralf Steppacher
> <ra...@derivativepartners.com> wrote:
>> Hallo all,
>>
>> I have several issues defining an async route in Spring XML.
>> Can someone shed some light on what the expected behavior is and what
>> would be a bug and/or a misunderstanding on my side?
>>
>> The stripped down route:
>>
>> <endpoint id="fileBufferFrom"
>> uri="file://${my.dir}/?delete=true&amp;idempotent=true&amp;preMove=inprogress" />
>>
>> <camel:route id="processFromFileBuffer">
>>   <camel:from ref="fileBufferFrom" />
>>   <camel:threads poolSize="5" maxPoolSize="5" maxQueueSize="1"
>> threadName="file consumer" rejectedPolicy="Abort">
>>     <camel:convertBodyTo type="java.io.InputStream" />
>>     <camel:split streaming="true" parallelProcessing="true">
>>         <camel:tokenize token="\r\n" />
>>         <camel:to ref="mq.csv" />
>>     </camel:split>
>>   </camel:threads>
>> </camel:route>
>>
>>
>> Camel 2.9.2
>> If maxQueueSize > 0 then maxPoolSize + maxQueueSize files are moved from
>> the in-folder to the inprogress-folder. If there are more files they are
>> moved to the inprogress-folder as soon as the caller thread is free to
>> move them and worker threads are available.
>> Setting rejectedPolicy="Abort" or callerRunsWhenRejected="false" seems
>> to get ignored and the reject policy "CallerRuns" is applied. The source
>> is a file endpoint, so I assume the thread with the scanned directory as
>> its name is the caller thread? I can see that thread come to live in
>> VisualVM.
>>
>> If maxQueueSize = 0 then all files present in the input directory are
>> always moved to the inprogress-folder AND the rejectedPolicy="Abort" is
>> honored! There are always maxPoolSize files processed in parallel.
>>
>>
>> Camel 2.10.0
>> Independent of the value of maxQueueSize, if rejectPolicy="Abort" then
>> the policy is honored, sort of. All files present in the input directory
>> are always moved to the inprogress-folder. Only maxPoolSiz +
>> maxQueueSize files are being processed. All others stay untouched in the
>> "inprogress" folder!
>>
>> Using <camel:threadPool> and referencing that in <camel:threads> does
>> not change the above behavior for 2.9.2 or 2.10.0.
>>
>>
>> The behavior I expected was that with rejectPolicy="Abort" always
>> maxPoolSize + maxQueueSize are moved from the in-folder to the
>> inprogress-folder and processed from there. As processing of one file
>> completes it gets deleted and a file from the in-folder is moved to the
>> inprogress-folder on the next poll of the in-folder.
>>
>>
>> Thanks!
>> Ralf
>
>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: File and async/ woes

Posted by Ralf Steppacher <ra...@derivativepartners.com>.
Hello Claus,

If I want 5 files to be processed in parallel line by line, every line
taken care of asynchronously, don't I need the
<threads ...><camel:split ...></camel:split></threads> construct?
If threads and split refer to the same thread pool, then of course I
would not have to worry about finding the sweet spot in terms of number
of threads in the two pools; to keep all threads in split-pool as busy
as possible and have no thread in the threads-pool waiting. I prefer to
keep them apart because that way I have more control about the number
and order (small files first) in which they are processed.

Using one thread pool would not solve my problem of the rejectedPolicy
not being honored, would it? This really kills me. I am processing files
ranging from 1kb to 200mb, sorted by their size. If the caller thread
ends up with the 200mb file, all other processing stalls until the big
file has been processed. I need to prevent processing of small files
being blocked by large files.

Is the file consumer supposed to preMove all files from the input
directory to the inprogress directory (2.10.0 behavior) or just the ones
it is actually going to process, i.e. number of threads (2.9.2
behavior)?

With rejectedPolicy="Abort":
If the 2.10.0 preMove behavior is the expected behavior, is it expected
that all files that could not be processed on the first poll of the file
endpoint (file number > number of threads) stay in the inprogress
directory for ever, never being processed (2.10.0 behavior) or should
they be picked up as soon as a thread becomes free (2.9.2 behavior with
respective preMove behavior) and the file endpoint polls again?


Thanks!
Ralf


-----Original Message-----
From: Claus Ibsen <cl...@gmail.com>
Reply-to: users@camel.apache.org
To: users@camel.apache.org
Subject: Re: File and async/<camel:threads> woes
Date: Thu, 6 Sep 2012 11:30:29 +0200

Hi

Why do it so complicated with 2 thread pools. The splitter can just
refer to a custom thread pool profile / thread pool which you can
customize as you want.

Also the fine consumer with preMove will move the file asap, it starts routing.


On Thu, Aug 30, 2012 at 5:24 PM, Ralf Steppacher
<ra...@derivativepartners.com> wrote:
> Hallo all,
>
> I have several issues defining an async route in Spring XML.
> Can someone shed some light on what the expected behavior is and what
> would be a bug and/or a misunderstanding on my side?
>
> The stripped down route:
>
> <endpoint id="fileBufferFrom"
> uri="file://${my.dir}/?delete=true&amp;idempotent=true&amp;preMove=inprogress" />
>
> <camel:route id="processFromFileBuffer">
>   <camel:from ref="fileBufferFrom" />
>   <camel:threads poolSize="5" maxPoolSize="5" maxQueueSize="1"
> threadName="file consumer" rejectedPolicy="Abort">
>     <camel:convertBodyTo type="java.io.InputStream" />
>     <camel:split streaming="true" parallelProcessing="true">
>         <camel:tokenize token="\r\n" />
>         <camel:to ref="mq.csv" />
>     </camel:split>
>   </camel:threads>
> </camel:route>
>
>
> Camel 2.9.2
> If maxQueueSize > 0 then maxPoolSize + maxQueueSize files are moved from
> the in-folder to the inprogress-folder. If there are more files they are
> moved to the inprogress-folder as soon as the caller thread is free to
> move them and worker threads are available.
> Setting rejectedPolicy="Abort" or callerRunsWhenRejected="false" seems
> to get ignored and the reject policy "CallerRuns" is applied. The source
> is a file endpoint, so I assume the thread with the scanned directory as
> its name is the caller thread? I can see that thread come to live in
> VisualVM.
>
> If maxQueueSize = 0 then all files present in the input directory are
> always moved to the inprogress-folder AND the rejectedPolicy="Abort" is
> honored! There are always maxPoolSize files processed in parallel.
>
>
> Camel 2.10.0
> Independent of the value of maxQueueSize, if rejectPolicy="Abort" then
> the policy is honored, sort of. All files present in the input directory
> are always moved to the inprogress-folder. Only maxPoolSiz +
> maxQueueSize files are being processed. All others stay untouched in the
> "inprogress" folder!
>
> Using <camel:threadPool> and referencing that in <camel:threads> does
> not change the above behavior for 2.9.2 or 2.10.0.
>
>
> The behavior I expected was that with rejectPolicy="Abort" always
> maxPoolSize + maxQueueSize are moved from the in-folder to the
> inprogress-folder and processed from there. As processing of one file
> completes it gets deleted and a file from the in-folder is moved to the
> inprogress-folder on the next poll of the in-folder.
>
>
> Thanks!
> Ralf




Re: File and async/ woes

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Why do it so complicated with 2 thread pools. The splitter can just
refer to a custom thread pool profile / thread pool which you can
customize as you want.

Also the fine consumer with preMove will move the file asap, it starts routing.


On Thu, Aug 30, 2012 at 5:24 PM, Ralf Steppacher
<ra...@derivativepartners.com> wrote:
> Hallo all,
>
> I have several issues defining an async route in Spring XML.
> Can someone shed some light on what the expected behavior is and what
> would be a bug and/or a misunderstanding on my side?
>
> The stripped down route:
>
> <endpoint id="fileBufferFrom"
> uri="file://${my.dir}/?delete=true&amp;idempotent=true&amp;preMove=inprogress" />
>
> <camel:route id="processFromFileBuffer">
>   <camel:from ref="fileBufferFrom" />
>   <camel:threads poolSize="5" maxPoolSize="5" maxQueueSize="1"
> threadName="file consumer" rejectedPolicy="Abort">
>     <camel:convertBodyTo type="java.io.InputStream" />
>     <camel:split streaming="true" parallelProcessing="true">
>         <camel:tokenize token="\r\n" />
>         <camel:to ref="mq.csv" />
>     </camel:split>
>   </camel:threads>
> </camel:route>
>
>
> Camel 2.9.2
> If maxQueueSize > 0 then maxPoolSize + maxQueueSize files are moved from
> the in-folder to the inprogress-folder. If there are more files they are
> moved to the inprogress-folder as soon as the caller thread is free to
> move them and worker threads are available.
> Setting rejectedPolicy="Abort" or callerRunsWhenRejected="false" seems
> to get ignored and the reject policy "CallerRuns" is applied. The source
> is a file endpoint, so I assume the thread with the scanned directory as
> its name is the caller thread? I can see that thread come to live in
> VisualVM.
>
> If maxQueueSize = 0 then all files present in the input directory are
> always moved to the inprogress-folder AND the rejectedPolicy="Abort" is
> honored! There are always maxPoolSize files processed in parallel.
>
>
> Camel 2.10.0
> Independent of the value of maxQueueSize, if rejectPolicy="Abort" then
> the policy is honored, sort of. All files present in the input directory
> are always moved to the inprogress-folder. Only maxPoolSiz +
> maxQueueSize files are being processed. All others stay untouched in the
> "inprogress" folder!
>
> Using <camel:threadPool> and referencing that in <camel:threads> does
> not change the above behavior for 2.9.2 or 2.10.0.
>
>
> The behavior I expected was that with rejectPolicy="Abort" always
> maxPoolSize + maxQueueSize are moved from the in-folder to the
> inprogress-folder and processed from there. As processing of one file
> completes it gets deleted and a file from the in-folder is moved to the
> inprogress-folder on the next poll of the in-folder.
>
>
> Thanks!
> Ralf



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen