You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Hugo Marcelino <hu...@gmail.com> on 2018/05/10 16:35:07 UTC

On completion failure when doing file processing

Hello,

I have inherited some code that is running Apache Camel and was asked if 
it was possible to send a notification whenever a processed file failed. 
I've found two links that looked promising to me.

1. https://issues.apache.org/jira/browse/CAMEL-3372
2. 
https://github.com/apache/camel/blob/master/camel-core/src/test/java/org/apache/camel/processor/FileRollbackOnCompletionTest.java 


The idea would be to use the ".onCompletion().onFailureOnly()" to react 
to failed file process and send a notification. But in fact what's 
happening is that the file is being moved to the failed folder but is 
not invoking my FileRollback:onFailure method.

I've built a test that is the closest as possible to our current code. 
The difference is that instead of being file:// is sftp:// but for the 
test case is not important.

Can you help me?

Thanks

public class FileProcessingFailedNotificationTest extends 
ContextTestSupport {

     private static final String basePath = "resources/";

     public void testSmoke() throws InterruptedException {
         Thread.sleep(10000);
     }

     @Override
     protected RouteBuilder createRouteBuilder() {

         return new RouteBuilder() {
             @Override
             public void configure() {
                 from("file://" + basePath + 
"?fileName=sample-in.csv&preMove=inprogress/&moveFailed=../failed/&move=../done/&readLock=changed") 

                         .to("direct:csvProcessor");

                 from("direct:csvProcessor")
                         .onCompletion().onFailureOnly()
                             .bean(FileRollback.class, "onFailure")
                         .end()
                         .process(new CSVProcessor())
                         .split(body()).streaming().parallelProcessing()
                         .process(new LineProcessor())
                         .end();
             }
         };
     }

     public static class FileRollback implements Synchronization {

         public void onComplete(Exchange exchange) {
             System.out.println("FileRollback:onComplete");
         }

         public void onFailure(Exchange exchange) {
             System.out.println("FileRollback:onFailure");
         }
     }

     private static class CSVProcessor implements Processor {

         @Override
         public void process(Exchange exchange) throws Exception {
             CsvDataFormat csvDataFormat = new CsvDataFormat();
             csvDataFormat.setDelimiter(',');
             csvDataFormat.setLazyLoad(true);
             csvDataFormat.setUseMaps(true);

             ServiceHelper.startService(csvDataFormat);

             InputStream stream = 
exchange.getIn().getMandatoryBody(InputStream.class);
             Message out = exchange.getOut();
             out.copyFrom(exchange.getIn());
             Object result = csvDataFormat.unmarshal(exchange, stream);
             out.setBody(result);
         }
     }

     private static class LineProcessor implements Processor {
         @Override
         public void process(Exchange exchange) {
             String body = exchange.getIn().getBody(String.class);

             if (body.contains("kaboom")) {
                 throw new RuntimeException("kaboom");

             } else {
                 System.out.println(body);
             }
         }
     }
}

Re: On completion failure when doing file processing

Posted by Quinn Stevenson <qu...@pronoia-solutions.com>.
Have you tried adding bridgeErrorHandler=true to the sftp operation?



> On May 10, 2018, at 10:35 AM, Hugo Marcelino <hu...@gmail.com> wrote:
> 
> Hello,
> 
> I have inherited some code that is running Apache Camel and was asked if it was possible to send a notification whenever a processed file failed. I've found two links that looked promising to me.
> 
> 1. https://issues.apache.org/jira/browse/CAMEL-3372
> 2. https://github.com/apache/camel/blob/master/camel-core/src/test/java/org/apache/camel/processor/FileRollbackOnCompletionTest.java 
> 
> The idea would be to use the ".onCompletion().onFailureOnly()" to react to failed file process and send a notification. But in fact what's happening is that the file is being moved to the failed folder but is not invoking my FileRollback:onFailure method.
> 
> I've built a test that is the closest as possible to our current code. The difference is that instead of being file:// is sftp:// but for the test case is not important.
> 
> Can you help me?
> 
> Thanks
> 
> public class FileProcessingFailedNotificationTest extends ContextTestSupport {
> 
>    private static final String basePath = "resources/";
> 
>    public void testSmoke() throws InterruptedException {
>        Thread.sleep(10000);
>    }
> 
>    @Override
>    protected RouteBuilder createRouteBuilder() {
> 
>        return new RouteBuilder() {
>            @Override
>            public void configure() {
>                from("file://" + basePath + "?fileName=sample-in.csv&preMove=inprogress/&moveFailed=../failed/&move=../done/&readLock=changed") 
>                        .to("direct:csvProcessor");
> 
>                from("direct:csvProcessor")
>                        .onCompletion().onFailureOnly()
>                            .bean(FileRollback.class, "onFailure")
>                        .end()
>                        .process(new CSVProcessor())
>                        .split(body()).streaming().parallelProcessing()
>                        .process(new LineProcessor())
>                        .end();
>            }
>        };
>    }
> 
>    public static class FileRollback implements Synchronization {
> 
>        public void onComplete(Exchange exchange) {
>            System.out.println("FileRollback:onComplete");
>        }
> 
>        public void onFailure(Exchange exchange) {
>            System.out.println("FileRollback:onFailure");
>        }
>    }
> 
>    private static class CSVProcessor implements Processor {
> 
>        @Override
>        public void process(Exchange exchange) throws Exception {
>            CsvDataFormat csvDataFormat = new CsvDataFormat();
>            csvDataFormat.setDelimiter(',');
>            csvDataFormat.setLazyLoad(true);
>            csvDataFormat.setUseMaps(true);
> 
>            ServiceHelper.startService(csvDataFormat);
> 
>            InputStream stream = exchange.getIn().getMandatoryBody(InputStream.class);
>            Message out = exchange.getOut();
>            out.copyFrom(exchange.getIn());
>            Object result = csvDataFormat.unmarshal(exchange, stream);
>            out.setBody(result);
>        }
>    }
> 
>    private static class LineProcessor implements Processor {
>        @Override
>        public void process(Exchange exchange) {
>            String body = exchange.getIn().getBody(String.class);
> 
>            if (body.contains("kaboom")) {
>                throw new RuntimeException("kaboom");
> 
>            } else {
>                System.out.println(body);
>            }
>        }
>    }
> }