You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Hugo Marcelino <hu...@gmail.com> on 2018/05/10 16:35:07 UTC
On completion failure when doing file processing
Hello,
I have inherited some code that is running Apache Camel and was asked if
it was possible to send a notification whenever a processed file failed.
I've found two links that looked promising to me.
1. https://issues.apache.org/jira/browse/CAMEL-3372
2.
https://github.com/apache/camel/blob/master/camel-core/src/test/java/org/apache/camel/processor/FileRollbackOnCompletionTest.java
The idea would be to use the ".onCompletion().onFailureOnly()" to react
to failed file process and send a notification. But in fact what's
happening is that the file is being moved to the failed folder but is
not invoking my FileRollback:onFailure method.
I've built a test that is the closest as possible to our current code.
The difference is that instead of being file:// is sftp:// but for the
test case is not important.
Can you help me?
Thanks
public class FileProcessingFailedNotificationTest extends
ContextTestSupport {
private static final String basePath = "resources/";
public void testSmoke() throws InterruptedException {
Thread.sleep(10000);
}
@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
from("file://" + basePath +
"?fileName=sample-in.csv&preMove=inprogress/&moveFailed=../failed/&move=../done/&readLock=changed")
.to("direct:csvProcessor");
from("direct:csvProcessor")
.onCompletion().onFailureOnly()
.bean(FileRollback.class, "onFailure")
.end()
.process(new CSVProcessor())
.split(body()).streaming().parallelProcessing()
.process(new LineProcessor())
.end();
}
};
}
public static class FileRollback implements Synchronization {
public void onComplete(Exchange exchange) {
System.out.println("FileRollback:onComplete");
}
public void onFailure(Exchange exchange) {
System.out.println("FileRollback:onFailure");
}
}
private static class CSVProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
CsvDataFormat csvDataFormat = new CsvDataFormat();
csvDataFormat.setDelimiter(',');
csvDataFormat.setLazyLoad(true);
csvDataFormat.setUseMaps(true);
ServiceHelper.startService(csvDataFormat);
InputStream stream =
exchange.getIn().getMandatoryBody(InputStream.class);
Message out = exchange.getOut();
out.copyFrom(exchange.getIn());
Object result = csvDataFormat.unmarshal(exchange, stream);
out.setBody(result);
}
}
private static class LineProcessor implements Processor {
@Override
public void process(Exchange exchange) {
String body = exchange.getIn().getBody(String.class);
if (body.contains("kaboom")) {
throw new RuntimeException("kaboom");
} else {
System.out.println(body);
}
}
}
}
Re: On completion failure when doing file processing
Posted by Quinn Stevenson <qu...@pronoia-solutions.com>.
Have you tried adding bridgeErrorHandler=true to the sftp operation?
> On May 10, 2018, at 10:35 AM, Hugo Marcelino <hu...@gmail.com> wrote:
>
> Hello,
>
> I have inherited some code that is running Apache Camel and was asked if it was possible to send a notification whenever a processed file failed. I've found two links that looked promising to me.
>
> 1. https://issues.apache.org/jira/browse/CAMEL-3372
> 2. https://github.com/apache/camel/blob/master/camel-core/src/test/java/org/apache/camel/processor/FileRollbackOnCompletionTest.java
>
> The idea would be to use the ".onCompletion().onFailureOnly()" to react to failed file process and send a notification. But in fact what's happening is that the file is being moved to the failed folder but is not invoking my FileRollback:onFailure method.
>
> I've built a test that is the closest as possible to our current code. The difference is that instead of being file:// is sftp:// but for the test case is not important.
>
> Can you help me?
>
> Thanks
>
> public class FileProcessingFailedNotificationTest extends ContextTestSupport {
>
> private static final String basePath = "resources/";
>
> public void testSmoke() throws InterruptedException {
> Thread.sleep(10000);
> }
>
> @Override
> protected RouteBuilder createRouteBuilder() {
>
> return new RouteBuilder() {
> @Override
> public void configure() {
> from("file://" + basePath + "?fileName=sample-in.csv&preMove=inprogress/&moveFailed=../failed/&move=../done/&readLock=changed")
> .to("direct:csvProcessor");
>
> from("direct:csvProcessor")
> .onCompletion().onFailureOnly()
> .bean(FileRollback.class, "onFailure")
> .end()
> .process(new CSVProcessor())
> .split(body()).streaming().parallelProcessing()
> .process(new LineProcessor())
> .end();
> }
> };
> }
>
> public static class FileRollback implements Synchronization {
>
> public void onComplete(Exchange exchange) {
> System.out.println("FileRollback:onComplete");
> }
>
> public void onFailure(Exchange exchange) {
> System.out.println("FileRollback:onFailure");
> }
> }
>
> private static class CSVProcessor implements Processor {
>
> @Override
> public void process(Exchange exchange) throws Exception {
> CsvDataFormat csvDataFormat = new CsvDataFormat();
> csvDataFormat.setDelimiter(',');
> csvDataFormat.setLazyLoad(true);
> csvDataFormat.setUseMaps(true);
>
> ServiceHelper.startService(csvDataFormat);
>
> InputStream stream = exchange.getIn().getMandatoryBody(InputStream.class);
> Message out = exchange.getOut();
> out.copyFrom(exchange.getIn());
> Object result = csvDataFormat.unmarshal(exchange, stream);
> out.setBody(result);
> }
> }
>
> private static class LineProcessor implements Processor {
> @Override
> public void process(Exchange exchange) {
> String body = exchange.getIn().getBody(String.class);
>
> if (body.contains("kaboom")) {
> throw new RuntimeException("kaboom");
>
> } else {
> System.out.println(body);
> }
> }
> }
> }