You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Chris Wolf <cw...@gmail.com> on 2013/04/04 17:11:55 UTC

When is an (S)FTP file no longer "in progress"?

Hi,

I created a custom Processor with producer template to perform SFTP
rather then using the sftp component normally,
due to the lack of dynamic URI capability for consumer.

onent.file.remote.SftpConsumer TRACE Skipping as file is already in
progress: CBOE34_MKT_20120319_DAILY.csv


The type of consumer created is ScheduledBatchPollingConsumer - how do
I indicate a batch is done?  Do I need to
suspend or stop this consumer?  Or does the down-stream endpoint, i.e.
"to(...)", have to indicate a response
(acknowledgement) on the exchange of file received?

Thanks for any assistance with this matter...

   -Chris



(Note, gmail text-plain will mangle the formatting of this code)

    @Override
    public void process(Exchange exchange) throws Exception {
        @SuppressWarnings("unchecked")
        Map<String, Object> ftpProp = exchange.getIn().getBody(Map.class);
        if (ftpProp == null)
            throw new RuntimeCamelException("No object of type
Map<String, Object> in input exchange message.");
        else
            log.info("FTP Properites: {}", ftpProp);

        Integer port = exchange.getIn().getHeader("SFTP_PORT",
Integer.class);
        ftpProp.put("port", port);

        CamelContext context = exchange.getContext();
        //String routeId = exchange.getFromRouteId();
        configure(context, ftpProp, endpointURIQueryString);
    }

    void configure(CamelContext context, Map<String, Object>
parameters, String queryStr) throws Exception {
        parameters.put("separator", RemoteFileConfiguration.PathSeparator.UNIX);
        parameters.put("binary", Boolean.TRUE);
        //parameters.put("disconnect", Boolean.TRUE);
        //parameters.put("passive", Boolean.TRUE);
        String initialURI = String.format("sftp://%s/%s?%s",
parameters.get("host"), parameters.get("directory"), queryStr);
        sftpEndpoint = context.getEndpoint(initialURI, SftpEndpoint.class);
        sftpComponent = (SftpComponent) sftpEndpoint.getComponent();

        SftpConfiguration conf = sftpEndpoint.getConfiguration();

        // set reference properties first as they use # syntax that
fools the regular properties setter
        EndpointHelper.setReferenceProperties(context, conf, parameters);
        EndpointHelper.setProperties(context, conf, parameters);
        EndpointHelper.setReferenceProperties(context, sftpEndpoint,
parameters);
        EndpointHelper.setProperties(context, sftpEndpoint, parameters);

        ServiceStatus status = sftpComponent.getStatus();
        log.info("***************** Component: {}", status);

        status = sftpEndpoint.getStatus();
        log.info("***************** Endpoint: {}", status);
        //EventDrivenPollingConsumer consumer = (EventDrivenPollingConsumer)
        //    sftpEndpoint.createPollingConsumer();

        sftpConsumer = (SftpConsumer)
            sftpEndpoint.createConsumer(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                producer.send(exchange); // send file down-stream...
            }
        });
        Map<String, Object> cprop = sftpEndpoint.getConsumerProperties();
        ((ScheduledPollConsumer) sftpConsumer).setStartScheduler(true);
        sftpConsumer.start();
        status = sftpConsumer.getStatus();
        log.info("***************** Consumer: {}", status);
    }

    public void setProducer(ProducerTemplate producer) {
        this.producer = producer;
    }

    public String getEndpointURIQueryString() {
        return endpointURIQueryString;
    }

    public void setEndpointURIQueryString(String endpointURIQueryString) {
        this.endpointURIQueryString = endpointURIQueryString;
    }