You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "michael elbaz (JIRA)" <ji...@apache.org> on 2018/10/28 18:07:00 UTC
[jira] [Updated] (CAMEL-12906) Strange comportement with aggregator
[ https://issues.apache.org/jira/browse/CAMEL-12906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
michael elbaz updated CAMEL-12906:
----------------------------------
Description:
/!\ Its about camel aggregator and error handling
My case is about graceful shutdown and error handling in case of aggregator first one:
In this example i will get *RejectedExecutionException* when during the shutdown and the exception is not catched by the *onexception* method so i will just lost message
{code:java}
@Component
public class AmqRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
errors();
from("timer:foo?period=1000")
.log("1")
.transform().body(() -> "DATA " + RandomStringUtils.randomAlphanumeric(10))
.convertBodyTo(String.class)
.to(amq());
from(amq()).to("direct:foo");
from("direct:foo")
.delay(3000)
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.completionSize(10)
.forceCompletionOnStop()
.log("${body}");
}
public void errors() {
onException(Exception.class)
.useOriginalMessage()
.to("activemq:recovery")
.handled(true)
.onRedelivery(exchange -> System.err.println("push to amq"));
errorHandler(deadLetterChannel("log:dead?level=ERROR"));
}
private static String amq() {
String amq = "activemq:data";
amq += "?transacted=true";
return amq;
}
}
{code}
In the seconde one example i'll will catch the exception
{code:java}
@Component
public class AmqRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
// errors();
from("timer:foo?period=1000")
.log("1")
.transform().body(() -> "DATA " + RandomStringUtils.randomAlphanumeric(10))
.convertBodyTo(String.class)
.to(amq());
from(amq()).to("direct:foo");
from("direct:foo")
.delay(3000)
.doTry()
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.completionSize(10)
.forceCompletionOnStop()
.log("${body}")
.endDoTry()
.doCatch(Exception.class)
.to("activemq:recovery")
.log(LoggingLevel.ERROR, "!!!!!!!!!!!!!!!!")
.end();
}
public void errors() {
onException(Exception.class)
.useOriginalMessage()
.to("activemq:recovery")
.handled(true)
.onRedelivery(exchange -> System.err.println("push to amq"));
errorHandler(deadLetterChannel("log:dead?level=ERROR"));
}
private static String amq() {
String amq = "activemq:data";
amq += "?transacted=true";
return amq;
}
}
{code}
was:
/!\ Its about camel aggregator and error handling
My case is about graceful shutdown and error handling in case of aggregator first one:
In this example i will get **RejectedExecutionException** when during the shutdown and the exception is not catched by the **onexception** method
{code:java}
@Component
public class AmqRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
errors();
from("timer:foo?period=1000")
.log("1")
.transform().body(() -> "DATA " + RandomStringUtils.randomAlphanumeric(10))
.convertBodyTo(String.class)
.to(amq());
from(amq()).to("direct:foo");
from("direct:foo")
.delay(3000)
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.completionSize(10)
.forceCompletionOnStop()
.log("${body}");
}
public void errors() {
onException(Exception.class)
.useOriginalMessage()
.to("activemq:recovery")
.handled(true)
.onRedelivery(exchange -> System.err.println("push to amq"));
errorHandler(deadLetterChannel("log:dead?level=ERROR"));
}
private static String amq() {
String amq = "activemq:data";
amq += "?transacted=true";
return amq;
}
}
{code}
> Strange comportement with aggregator
> ------------------------------------
>
> Key: CAMEL-12906
> URL: https://issues.apache.org/jira/browse/CAMEL-12906
> Project: Camel
> Issue Type: Bug
> Components: camel-activemq
> Affects Versions: 2.22.1
> Reporter: michael elbaz
> Priority: Major
>
> /!\ Its about camel aggregator and error handling
> My case is about graceful shutdown and error handling in case of aggregator first one:
> In this example i will get *RejectedExecutionException* when during the shutdown and the exception is not catched by the *onexception* method so i will just lost message
> {code:java}
> @Component
> public class AmqRoute extends RouteBuilder {
> @Override
> public void configure() throws Exception {
> errors();
> from("timer:foo?period=1000")
> .log("1")
> .transform().body(() -> "DATA " + RandomStringUtils.randomAlphanumeric(10))
> .convertBodyTo(String.class)
> .to(amq());
> from(amq()).to("direct:foo");
> from("direct:foo")
> .delay(3000)
> .aggregate(constant(true), new GroupedBodyAggregationStrategy())
> .completionSize(10)
> .forceCompletionOnStop()
> .log("${body}");
> }
> public void errors() {
> onException(Exception.class)
> .useOriginalMessage()
> .to("activemq:recovery")
> .handled(true)
> .onRedelivery(exchange -> System.err.println("push to amq"));
> errorHandler(deadLetterChannel("log:dead?level=ERROR"));
> }
> private static String amq() {
> String amq = "activemq:data";
> amq += "?transacted=true";
> return amq;
> }
> }
> {code}
> In the seconde one example i'll will catch the exception
> {code:java}
> @Component
> public class AmqRoute extends RouteBuilder {
> @Override
> public void configure() throws Exception {
> // errors();
> from("timer:foo?period=1000")
> .log("1")
> .transform().body(() -> "DATA " + RandomStringUtils.randomAlphanumeric(10))
> .convertBodyTo(String.class)
> .to(amq());
> from(amq()).to("direct:foo");
> from("direct:foo")
> .delay(3000)
> .doTry()
> .aggregate(constant(true), new GroupedBodyAggregationStrategy())
> .completionSize(10)
> .forceCompletionOnStop()
> .log("${body}")
> .endDoTry()
> .doCatch(Exception.class)
> .to("activemq:recovery")
> .log(LoggingLevel.ERROR, "!!!!!!!!!!!!!!!!")
> .end();
> }
> public void errors() {
> onException(Exception.class)
> .useOriginalMessage()
> .to("activemq:recovery")
> .handled(true)
> .onRedelivery(exchange -> System.err.println("push to amq"));
> errorHandler(deadLetterChannel("log:dead?level=ERROR"));
> }
> private static String amq() {
> String amq = "activemq:data";
> amq += "?transacted=true";
> return amq;
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)