You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by neil Stevens <ne...@gmail.com> on 2015/01/09 13:17:20 UTC

Time consuming route starting at FTP endpoint

Hello,

I'm after some advice from people more experienced in Camel than
myself on what the correct way is to structure my route(s).  My
program essentially does the following:

1. Downloads file from FTP server
2. Saves a copy of the file (for auditing purposes)
3. Does some additional (time consuming) processing on the data

Therefore my code is like:

from("ftp://myserver/toProcess?move=processed")
.to("file://recievedData")
.to("direct://additionalProcessing");

I use the "move" option on the FTP component to move the file after
processing so it will not be processed again.  My problem is often by
the time the processing (step 3) has completed the FTP connection is
closed, so the file can not be moved resulting in the following
exception stack trace:

org.apache.camel.component.file.GenericFileOperationFailedException:
File operation failed: null Connection is not open. Code: 221
at org.apache.camel.component.file.remote.FtpOperations.buildDirectory(FtpOperations.java:290)
at org.apache.camel.component.file.strategy.GenericFileProcessStrategySupport.renameFile(GenericFileProcessStrategySupport.java:106)
at org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy.commit(GenericFileRenameProcessStrategy.java:88)
at org.apache.camel.component.file.GenericFileOnCompletion.processStrategyCommit(GenericFileOnCompletion.java:124)
at org.apache.camel.component.file.GenericFileOnCompletion.onCompletion(GenericFileOnCompletion.java:80)
at org.apache.camel.component.file.GenericFileOnCompletion.onComplete(GenericFileOnCompletion.java:54)
at org.apache.camel.util.UnitOfWorkHelper.doneSynchronizations(UnitOfWorkHelper.java:100)
at org.apache.camel.impl.DefaultUnitOfWork.done(DefaultUnitOfWork.java:228)
at org.apache.camel.util.UnitOfWorkHelper.doneUow(UnitOfWorkHelper.java:61)
at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:613)
at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:581)
at org.apache.camel.processor.CamelInternalProcessor$InternalCallback.done(CamelInternalProcessor.java:240)
at org.apache.camel.impl.MDCUnitOfWork$MDCCallback.done(MDCUnitOfWork.java:205)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:106)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:291)
at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:200)
at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:147)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.io.IOException: Connection is not open
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:474)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:608)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:582)
at org.apache.commons.net.ftp.FTP.pwd(FTP.java:1454)
at org.apache.commons.net.ftp.FTPClient.printWorkingDirectory(FTPClient.java:2658)
at org.apache.camel.component.file.remote.FtpOperations.buildDirectory(FtpOperations.java:267)
... 20 more

Therefore my thinking was I can just restructure the code so the file
is moved on the FTP server after I've saved a copy of it.  I hoped I
would be able to achieve this by putting in a SEDA component:

from("ftp://myserver/toProcess?move=processed")
.to("file://recievedData")
.to("seda://dataQueue")

from("seda://dataQueue")
.to("direct://additionalProcessing")

However this does not achieve what I want.  The file will still not be
renamed on the FTP server until "additionalProcessing" completes.  I
tried setting the SEDA component options "waitForTaskToComplete=Never"
and "exchangePattern=InOnly" in the hope that this may help but it
doesn't.

I thought of 2 ideas how I can achieve what I want (with my current
understanding of Camel), but I don't think either is the correct
solution (and I have not tested either yet).  Solution 1 is to divide
it into 2 routes and write it to a temporary location:

from("ftp://myserver/toProcess?move=processed")
.to("file://tempLocation");

from("file://tempLocation")
.to("file://recievedData")
.to("direct://aditionalProcessing")

This seems wrong as it adds the extra IO overhead.  Solution 2 is to
use wiretap EIP as follows:

from("ftp://myserver/toProcess?move=processed")
.to("file://recievedData")
.wiretap("direct:additionalProcessing");

However I keep getting the feeling the above solution would be a
misuse of the wiretap EIP.  Does any one have any idea what the
correct way to do this in Camel would be?

Thanks in advance for any advice any one can give me.

Neil

Re: Time consuming route starting at FTP endpoint

Posted by Carsten Ringe <ca...@kopis.de>.
Hi Neil,

you can also try to write your route to retrieve the file from FTP and 
store it somewhere under your control. If your processing fails later, 
you can restart from there. This would have the benefit of removing the 
file from the external FTP in any case, not cluttering the storage of 
the external party if something on your side breaks.



Carsten


Am 09.01.2015 um 13:55 schrieb neil Stevens:
> Thanks for the suggestion Morgan,
>
> I'll look into if this is possible but I'm not sure if it will be as
> we have to connect to several FTP servers which are not under our
> control.  Though if not I could extend the timeout to a very big value
> to solve the problem.
>
> I was just hoping Camel may provide a way to notify the start endpoint
> it can be released when the message gets to a specific point in the
> route.
>
> Neil
>
> On 9 January 2015 at 12:33, Morgan Hautman <mo...@gmail.com> wrote:
>> Hi Neil,
>>
>> Maybe try to connect via SFTP since there is no data timeout value for it (under "timeout" option : For SFTP there is no data timeout.)
>> http://camel.apache.org/ftp2.html
>>
>> Hope this helps!
>>
>> Regards
>>
>> Morgan
>>
>> -----Original Message-----
>> From: neil Stevens [mailto:neilstevens2005@gmail.com]
>> Sent: vrijdag 9 januari 2015 13:17
>> To: users@camel.apache.org
>> Subject: Time consuming route starting at FTP endpoint
>>
>> Hello,
>>
>> I'm after some advice from people more experienced in Camel than myself on what the correct way is to structure my route(s).  My program essentially does the following:
>>
>> 1. Downloads file from FTP server
>> 2. Saves a copy of the file (for auditing purposes) 3. Does some additional (time consuming) processing on the data
>>
>> Therefore my code is like:
>>
>> from("ftp://myserver/toProcess?move=processed")
>> .to("file://recievedData")
>> .to("direct://additionalProcessing");
>>
>> I use the "move" option on the FTP component to move the file after processing so it will not be processed again.  My problem is often by the time the processing (step 3) has completed the FTP connection is closed, so the file can not be moved resulting in the following exception stack trace:
>>
>> org.apache.camel.component.file.GenericFileOperationFailedException:
>> File operation failed: null Connection is not open. Code: 221 at org.apache.camel.component.file.remote.FtpOperations.buildDirectory(FtpOperations.java:290)
>> at org.apache.camel.component.file.strategy.GenericFileProcessStrategySupport.renameFile(GenericFileProcessStrategySupport.java:106)
>> at org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy.commit(GenericFileRenameProcessStrategy.java:88)
>> at org.apache.camel.component.file.GenericFileOnCompletion.processStrategyCommit(GenericFileOnCompletion.java:124)
>> at org.apache.camel.component.file.GenericFileOnCompletion.onCompletion(GenericFileOnCompletion.java:80)
>> at org.apache.camel.component.file.GenericFileOnCompletion.onComplete(GenericFileOnCompletion.java:54)
>> at org.apache.camel.util.UnitOfWorkHelper.doneSynchronizations(UnitOfWorkHelper.java:100)
>> at org.apache.camel.impl.DefaultUnitOfWork.done(DefaultUnitOfWork.java:228)
>> at org.apache.camel.util.UnitOfWorkHelper.doneUow(UnitOfWorkHelper.java:61)
>> at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:613)
>> at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:581)
>> at org.apache.camel.processor.CamelInternalProcessor$InternalCallback.done(CamelInternalProcessor.java:240)
>> at org.apache.camel.impl.MDCUnitOfWork$MDCCallback.done(MDCUnitOfWork.java:205)
>> at org.apache.camel.processor.Pipeline.process(Pipeline.java:106)
>> at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
>> at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:291)
>> at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:200)
>> at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:147)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:722)
>> Caused by: java.io.IOException: Connection is not open at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:474)
>> at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:608)
>> at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:582)
>> at org.apache.commons.net.ftp.FTP.pwd(FTP.java:1454)
>> at org.apache.commons.net.ftp.FTPClient.printWorkingDirectory(FTPClient.java:2658)
>> at org.apache.camel.component.file.remote.FtpOperations.buildDirectory(FtpOperations.java:267)
>> ... 20 more
>>
>> Therefore my thinking was I can just restructure the code so the file is moved on the FTP server after I've saved a copy of it.  I hoped I would be able to achieve this by putting in a SEDA component:
>>
>> from("ftp://myserver/toProcess?move=processed")
>> .to("file://recievedData")
>> .to("seda://dataQueue")
>>
>> from("seda://dataQueue")
>> .to("direct://additionalProcessing")
>>
>> However this does not achieve what I want.  The file will still not be renamed on the FTP server until "additionalProcessing" completes.  I tried setting the SEDA component options "waitForTaskToComplete=Never"
>> and "exchangePattern=InOnly" in the hope that this may help but it doesn't.
>>
>> I thought of 2 ideas how I can achieve what I want (with my current understanding of Camel), but I don't think either is the correct solution (and I have not tested either yet).  Solution 1 is to divide it into 2 routes and write it to a temporary location:
>>
>> from("ftp://myserver/toProcess?move=processed")
>> .to("file://tempLocation");
>>
>> from("file://tempLocation")
>> .to("file://recievedData")
>> .to("direct://aditionalProcessing")
>>
>> This seems wrong as it adds the extra IO overhead.  Solution 2 is to use wiretap EIP as follows:
>>
>> from("ftp://myserver/toProcess?move=processed")
>> .to("file://recievedData")
>> .wiretap("direct:additionalProcessing");
>>
>> However I keep getting the feeling the above solution would be a misuse of the wiretap EIP.  Does any one have any idea what the correct way to do this in Camel would be?
>>
>> Thanks in advance for any advice any one can give me.
>>
>> Neil
>>


Re: Time consuming route starting at FTP endpoint

Posted by neil Stevens <ne...@gmail.com>.
Thanks for the suggestion Morgan,

I'll look into if this is possible but I'm not sure if it will be as
we have to connect to several FTP servers which are not under our
control.  Though if not I could extend the timeout to a very big value
to solve the problem.

I was just hoping Camel may provide a way to notify the start endpoint
it can be released when the message gets to a specific point in the
route.

Neil

On 9 January 2015 at 12:33, Morgan Hautman <mo...@gmail.com> wrote:
> Hi Neil,
>
> Maybe try to connect via SFTP since there is no data timeout value for it (under "timeout" option : For SFTP there is no data timeout.)
> http://camel.apache.org/ftp2.html
>
> Hope this helps!
>
> Regards
>
> Morgan
>
> -----Original Message-----
> From: neil Stevens [mailto:neilstevens2005@gmail.com]
> Sent: vrijdag 9 januari 2015 13:17
> To: users@camel.apache.org
> Subject: Time consuming route starting at FTP endpoint
>
> Hello,
>
> I'm after some advice from people more experienced in Camel than myself on what the correct way is to structure my route(s).  My program essentially does the following:
>
> 1. Downloads file from FTP server
> 2. Saves a copy of the file (for auditing purposes) 3. Does some additional (time consuming) processing on the data
>
> Therefore my code is like:
>
> from("ftp://myserver/toProcess?move=processed")
> .to("file://recievedData")
> .to("direct://additionalProcessing");
>
> I use the "move" option on the FTP component to move the file after processing so it will not be processed again.  My problem is often by the time the processing (step 3) has completed the FTP connection is closed, so the file can not be moved resulting in the following exception stack trace:
>
> org.apache.camel.component.file.GenericFileOperationFailedException:
> File operation failed: null Connection is not open. Code: 221 at org.apache.camel.component.file.remote.FtpOperations.buildDirectory(FtpOperations.java:290)
> at org.apache.camel.component.file.strategy.GenericFileProcessStrategySupport.renameFile(GenericFileProcessStrategySupport.java:106)
> at org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy.commit(GenericFileRenameProcessStrategy.java:88)
> at org.apache.camel.component.file.GenericFileOnCompletion.processStrategyCommit(GenericFileOnCompletion.java:124)
> at org.apache.camel.component.file.GenericFileOnCompletion.onCompletion(GenericFileOnCompletion.java:80)
> at org.apache.camel.component.file.GenericFileOnCompletion.onComplete(GenericFileOnCompletion.java:54)
> at org.apache.camel.util.UnitOfWorkHelper.doneSynchronizations(UnitOfWorkHelper.java:100)
> at org.apache.camel.impl.DefaultUnitOfWork.done(DefaultUnitOfWork.java:228)
> at org.apache.camel.util.UnitOfWorkHelper.doneUow(UnitOfWorkHelper.java:61)
> at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:613)
> at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:581)
> at org.apache.camel.processor.CamelInternalProcessor$InternalCallback.done(CamelInternalProcessor.java:240)
> at org.apache.camel.impl.MDCUnitOfWork$MDCCallback.done(MDCUnitOfWork.java:205)
> at org.apache.camel.processor.Pipeline.process(Pipeline.java:106)
> at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
> at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:291)
> at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:200)
> at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:147)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:722)
> Caused by: java.io.IOException: Connection is not open at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:474)
> at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:608)
> at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:582)
> at org.apache.commons.net.ftp.FTP.pwd(FTP.java:1454)
> at org.apache.commons.net.ftp.FTPClient.printWorkingDirectory(FTPClient.java:2658)
> at org.apache.camel.component.file.remote.FtpOperations.buildDirectory(FtpOperations.java:267)
> ... 20 more
>
> Therefore my thinking was I can just restructure the code so the file is moved on the FTP server after I've saved a copy of it.  I hoped I would be able to achieve this by putting in a SEDA component:
>
> from("ftp://myserver/toProcess?move=processed")
> .to("file://recievedData")
> .to("seda://dataQueue")
>
> from("seda://dataQueue")
> .to("direct://additionalProcessing")
>
> However this does not achieve what I want.  The file will still not be renamed on the FTP server until "additionalProcessing" completes.  I tried setting the SEDA component options "waitForTaskToComplete=Never"
> and "exchangePattern=InOnly" in the hope that this may help but it doesn't.
>
> I thought of 2 ideas how I can achieve what I want (with my current understanding of Camel), but I don't think either is the correct solution (and I have not tested either yet).  Solution 1 is to divide it into 2 routes and write it to a temporary location:
>
> from("ftp://myserver/toProcess?move=processed")
> .to("file://tempLocation");
>
> from("file://tempLocation")
> .to("file://recievedData")
> .to("direct://aditionalProcessing")
>
> This seems wrong as it adds the extra IO overhead.  Solution 2 is to use wiretap EIP as follows:
>
> from("ftp://myserver/toProcess?move=processed")
> .to("file://recievedData")
> .wiretap("direct:additionalProcessing");
>
> However I keep getting the feeling the above solution would be a misuse of the wiretap EIP.  Does any one have any idea what the correct way to do this in Camel would be?
>
> Thanks in advance for any advice any one can give me.
>
> Neil
>

RE: Time consuming route starting at FTP endpoint

Posted by Morgan Hautman <mo...@gmail.com>.
Hi Neil,

Maybe try to connect via SFTP since there is no data timeout value for it (under "timeout" option : For SFTP there is no data timeout.)
http://camel.apache.org/ftp2.html

Hope this helps!

Regards

Morgan

-----Original Message-----
From: neil Stevens [mailto:neilstevens2005@gmail.com] 
Sent: vrijdag 9 januari 2015 13:17
To: users@camel.apache.org
Subject: Time consuming route starting at FTP endpoint

Hello,

I'm after some advice from people more experienced in Camel than myself on what the correct way is to structure my route(s).  My program essentially does the following:

1. Downloads file from FTP server
2. Saves a copy of the file (for auditing purposes) 3. Does some additional (time consuming) processing on the data

Therefore my code is like:

from("ftp://myserver/toProcess?move=processed")
.to("file://recievedData")
.to("direct://additionalProcessing");

I use the "move" option on the FTP component to move the file after processing so it will not be processed again.  My problem is often by the time the processing (step 3) has completed the FTP connection is closed, so the file can not be moved resulting in the following exception stack trace:

org.apache.camel.component.file.GenericFileOperationFailedException:
File operation failed: null Connection is not open. Code: 221 at org.apache.camel.component.file.remote.FtpOperations.buildDirectory(FtpOperations.java:290)
at org.apache.camel.component.file.strategy.GenericFileProcessStrategySupport.renameFile(GenericFileProcessStrategySupport.java:106)
at org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy.commit(GenericFileRenameProcessStrategy.java:88)
at org.apache.camel.component.file.GenericFileOnCompletion.processStrategyCommit(GenericFileOnCompletion.java:124)
at org.apache.camel.component.file.GenericFileOnCompletion.onCompletion(GenericFileOnCompletion.java:80)
at org.apache.camel.component.file.GenericFileOnCompletion.onComplete(GenericFileOnCompletion.java:54)
at org.apache.camel.util.UnitOfWorkHelper.doneSynchronizations(UnitOfWorkHelper.java:100)
at org.apache.camel.impl.DefaultUnitOfWork.done(DefaultUnitOfWork.java:228)
at org.apache.camel.util.UnitOfWorkHelper.doneUow(UnitOfWorkHelper.java:61)
at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:613)
at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:581)
at org.apache.camel.processor.CamelInternalProcessor$InternalCallback.done(CamelInternalProcessor.java:240)
at org.apache.camel.impl.MDCUnitOfWork$MDCCallback.done(MDCUnitOfWork.java:205)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:106)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:291)
at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:200)
at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:147)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.io.IOException: Connection is not open at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:474)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:608)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:582)
at org.apache.commons.net.ftp.FTP.pwd(FTP.java:1454)
at org.apache.commons.net.ftp.FTPClient.printWorkingDirectory(FTPClient.java:2658)
at org.apache.camel.component.file.remote.FtpOperations.buildDirectory(FtpOperations.java:267)
... 20 more

Therefore my thinking was I can just restructure the code so the file is moved on the FTP server after I've saved a copy of it.  I hoped I would be able to achieve this by putting in a SEDA component:

from("ftp://myserver/toProcess?move=processed")
.to("file://recievedData")
.to("seda://dataQueue")

from("seda://dataQueue")
.to("direct://additionalProcessing")

However this does not achieve what I want.  The file will still not be renamed on the FTP server until "additionalProcessing" completes.  I tried setting the SEDA component options "waitForTaskToComplete=Never"
and "exchangePattern=InOnly" in the hope that this may help but it doesn't.

I thought of 2 ideas how I can achieve what I want (with my current understanding of Camel), but I don't think either is the correct solution (and I have not tested either yet).  Solution 1 is to divide it into 2 routes and write it to a temporary location:

from("ftp://myserver/toProcess?move=processed")
.to("file://tempLocation");

from("file://tempLocation")
.to("file://recievedData")
.to("direct://aditionalProcessing")

This seems wrong as it adds the extra IO overhead.  Solution 2 is to use wiretap EIP as follows:

from("ftp://myserver/toProcess?move=processed")
.to("file://recievedData")
.wiretap("direct:additionalProcessing");

However I keep getting the feeling the above solution would be a misuse of the wiretap EIP.  Does any one have any idea what the correct way to do this in Camel would be?

Thanks in advance for any advice any one can give me.

Neil