You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Chris Wolf <cw...@gmail.com> on 2013/04/04 17:11:55 UTC
When is an (S)FTP file no longer "in progress"?
Hi,
I created a custom Processor with producer template to perform SFTP
rather then using the sftp component normally,
due to the lack of dynamic URI capability for consumer.
onent.file.remote.SftpConsumer TRACE Skipping as file is already in
progress: CBOE34_MKT_20120319_DAILY.csv
The type of consumer created is ScheduledBatchPollingConsumer - how do
I indicate a batch is done? Do I need to
suspend or stop this consumer? Or does the down-stream endpoint, i.e.
"to(...)", have to indicate a response
(acknowledgement) on the exchange of file received?
Thanks for any assistance with this matter...
-Chris
(Note, gmail text-plain will mangle the formatting of this code)
@Override
public void process(Exchange exchange) throws Exception {
@SuppressWarnings("unchecked")
Map<String, Object> ftpProp = exchange.getIn().getBody(Map.class);
if (ftpProp == null)
throw new RuntimeCamelException("No object of type
Map<String, Object> in input exchange message.");
else
log.info("FTP Properites: {}", ftpProp);
Integer port = exchange.getIn().getHeader("SFTP_PORT",
Integer.class);
ftpProp.put("port", port);
CamelContext context = exchange.getContext();
//String routeId = exchange.getFromRouteId();
configure(context, ftpProp, endpointURIQueryString);
}
void configure(CamelContext context, Map<String, Object>
parameters, String queryStr) throws Exception {
parameters.put("separator", RemoteFileConfiguration.PathSeparator.UNIX);
parameters.put("binary", Boolean.TRUE);
//parameters.put("disconnect", Boolean.TRUE);
//parameters.put("passive", Boolean.TRUE);
String initialURI = String.format("sftp://%s/%s?%s",
parameters.get("host"), parameters.get("directory"), queryStr);
sftpEndpoint = context.getEndpoint(initialURI, SftpEndpoint.class);
sftpComponent = (SftpComponent) sftpEndpoint.getComponent();
SftpConfiguration conf = sftpEndpoint.getConfiguration();
// set reference properties first as they use # syntax that
fools the regular properties setter
EndpointHelper.setReferenceProperties(context, conf, parameters);
EndpointHelper.setProperties(context, conf, parameters);
EndpointHelper.setReferenceProperties(context, sftpEndpoint,
parameters);
EndpointHelper.setProperties(context, sftpEndpoint, parameters);
ServiceStatus status = sftpComponent.getStatus();
log.info("***************** Component: {}", status);
status = sftpEndpoint.getStatus();
log.info("***************** Endpoint: {}", status);
//EventDrivenPollingConsumer consumer = (EventDrivenPollingConsumer)
// sftpEndpoint.createPollingConsumer();
sftpConsumer = (SftpConsumer)
sftpEndpoint.createConsumer(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
producer.send(exchange); // send file down-stream...
}
});
Map<String, Object> cprop = sftpEndpoint.getConsumerProperties();
((ScheduledPollConsumer) sftpConsumer).setStartScheduler(true);
sftpConsumer.start();
status = sftpConsumer.getStatus();
log.info("***************** Consumer: {}", status);
}
public void setProducer(ProducerTemplate producer) {
this.producer = producer;
}
public String getEndpointURIQueryString() {
return endpointURIQueryString;
}
public void setEndpointURIQueryString(String endpointURIQueryString) {
this.endpointURIQueryString = endpointURIQueryString;
}