You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by Christian Müller <ch...@gmail.com> on 2011/05/22 18:17:14 UTC

Missing feature to handle errors in a route which reads from an activemq destination

Hello!

To illustrate the missing feature, assume we have the following route:
from(activemq:queue:foo)
  .to("cxf:bean:orderService")
  .to("activemq:queue:bar");

Our orderService is out of order every day for 30 min - 60 min. In this
time, we cannot process messages from the foo queue. We have to delay the
processing until the orderService returns to operate normal. We have
multiple possibilities at present to handle the requirement, but all have
some drawbacks, IMO.

1) Make the route transacted and let the ActiveMQ Broker do the (redelivery)
work.
from(activemq:queue:foo)
  .transacted("REQUIRED")
  .to("cxf:bean:orderService")
  .to("activemq:queue:bar");

Drawbacks:
- We are not the owner of this configuration cannot ensure the ActiveMQ
broker/JMSConnectionFactory is configured properly for the redelivery.
- We cannot use different redelivery policies for different routes.


2) Use the Camel dead letter error handler to catch the errors and do the
redelivery.
errorHandler(
  deadLetterChannel(activemq:queue:foo.DLQ)
  .maximumRedeliveries(8)
  .deliveryDelay(60000)
  .useExponentialBackOff()
  .backOffMultiplier(2));

from(activemq:queue:foo)
  .to("cxf:bean:orderService")
  .to("activemq:queue:bar");

Drawbacks:
- When we are in the seventh redelivery, we will wait over one hour for the
next redelivery. If we have to shout down our container (which is
ServiceMix), it will take over one hour or we have to force the shutdown and
we loose the message.


To solve the missing feature, we have at least two options:


a) Provide a "special" error handler which uses the "delay and schedule
message delivery" feature from ActiveMQ [3]. This error handler should/could
looks like the RedeliveryErrorHandler:
errorHandler(
  activeMqDeadLetterChannel(activemq:queue:foo.DLQ)
  .maximumRedeliveries(8)
  .deliveryDelay(60000)
  .useExponentialBackOff()
  .backOffMultiplier(2));

from(activemq:queue:foo)
  .to("cxf:bean:orderService")
  .to("activemq:queue:bar");

If an error occurs, this error handler should evaluate the (JMS) headers
whether or not the message should redelivered, which delay should be used,
update the (JMS) headers with the new values and enqueue the message again
into the queue the message was read from. May be this route must/should also
be transacted to make sure we do not loose messages.


b) Add the options described in [1] and [2] to the activemq component to
allow the user to specify the redelivery behavior per endpoint definition:
from(activemq:queue:foo?maximumRedeliveries=10&deliveryDelay=60000&useExponentialBackOff=true&backOffMultiplier=2)
  .transacted("REQUIRED")
  .to("cxf:bean:orderService")
  .to("activemq:queue:bar");

This options should be used to configure the JMSConnection.
I know it's also possible to configure this in the
ActiveMQConnectionFactory, but:
- We are not the "owner" of this configuration. We use an OSGI lookup to get
the reference to the exported ActiveMQConnectionFactory.
- It's a global configuration and we may need the possibility to override
some options (deliveryDelay) for other routes.


Do I miss something? What are your thoughts?


[1] http://activemq.apache.org/redelivery-policy.html
[2] http://activemq.apache.org/message-redelivery-and-dlq-handling.html
[3] http://activemq.apache.org/delay-and-schedule-message-delivery.html

Looking forward for many opinions :-),
Christian

Re: Missing feature to handle errors in a route which reads from an activemq destination

Posted by Willem Jiang <wi...@gmail.com>.
Hi Christian,

You don't need to configure the activemq component with the name of 
activemq.

You may configure this rededelivery activemq component with name of 
rededeliveryActivemq in your spring configuration. Then You can using 
different configuration on the ActiveMQConnectionFactory.


On 5/23/11 12:17 AM, Christian Müller wrote:
> b) Add the options described in [1] and [2] to the activemq component to
> allow the user to specify the redelivery behavior per endpoint definition:
> from(activemq:queue:foo?maximumRedeliveries=10&deliveryDelay=60000&useExponentialBackOff=true&backOffMultiplier=2)
>    .transacted("REQUIRED")
>    .to("cxf:bean:orderService")
>    .to("activemq:queue:bar");
>
> This options should be used to configure the JMSConnection.
> I know it's also possible to configure this in the
> ActiveMQConnectionFactory, but:
> - We are not the "owner" of this configuration. We use an OSGI lookup to get
> the reference to the exported ActiveMQConnectionFactory.
> - It's a global configuration and we may need the possibility to override
> some options (deliveryDelay) for other routes.



-- 
Willem
----------------------------------
FuseSource
Web: http://www.fusesource.com
Blog:    http://willemjiang.blogspot.com (English)
          http://jnn.javaeye.com (Chinese)
Twitter: willemjiang

Connect at CamelOne May 24-26
The Open Source Integration Conference
http://camelone.com

Re: Missing feature to handle errors in a route which reads from an activemq destination

Posted by boday <be...@initekconsulting.com>.
Christian, great question...I see various forms of this asked many times. 
I've considered all the options you mentioned, but for high-volume/critical
messaging scenarios, I struggle with the idea of leaving messages up to the
AMQ config/inflight repositories for lack of control/visibility/testability,
etc...  

For these reasons, I generally end up doing something like this...catch all
errors and send to a separate queue to be periodically retried independent
of the main route.  I like this mainly because its simple/clean, puts all
errors in a queue for triage (viewing, clearing, altering, manually
retrying, etc), and has a separate (retry) route that can be
stopped/started, etc.

basically, something like this...

//main route to process message from a queue (needs to be fast)
from(activemq:queue:foo)
  .to("cxf:bean:orderService")
  .to("activemq:queue:bar");

//handle any errors by simply moving them to an error queue (for retry
later)
onException(Exception.class)
    .handled(true).to("activemq:queue:fooErrorQ");

//retry the error queue every 10 minutes
from("timer://retryTimer?fixedRate=true&period=60000")
    .pollEnrich("activemq:queue:fooErrorQ")
    .to("cxf:bean:orderService")
    .to("activemq:queue:bar");

That being said, this seems a bit archaic/simplistic given all the other
options.  Either way, I'm interested in other takes on this as well...


Christian Mueller wrote:
> 
> Hello!
> 
> To illustrate the missing feature, assume we have the following route:
> from(activemq:queue:foo)
>   .to("cxf:bean:orderService")
>   .to("activemq:queue:bar");
> 
> Our orderService is out of order every day for 30 min - 60 min. In this
> time, we cannot process messages from the foo queue. We have to delay the
> processing until the orderService returns to operate normal. We have
> multiple possibilities at present to handle the requirement, but all have
> some drawbacks, IMO.
> 
> 1) Make the route transacted and let the ActiveMQ Broker do the
> (redelivery)
> work.
> from(activemq:queue:foo)
>   .transacted("REQUIRED")
>   .to("cxf:bean:orderService")
>   .to("activemq:queue:bar");
> 
> Drawbacks:
> - We are not the owner of this configuration cannot ensure the ActiveMQ
> broker/JMSConnectionFactory is configured properly for the redelivery.
> - We cannot use different redelivery policies for different routes.
> 
> 
> 2) Use the Camel dead letter error handler to catch the errors and do the
> redelivery.
> errorHandler(
>   deadLetterChannel(activemq:queue:foo.DLQ)
>   .maximumRedeliveries(8)
>   .deliveryDelay(60000)
>   .useExponentialBackOff()
>   .backOffMultiplier(2));
> 
> from(activemq:queue:foo)
>   .to("cxf:bean:orderService")
>   .to("activemq:queue:bar");
> 
> Drawbacks:
> - When we are in the seventh redelivery, we will wait over one hour for
> the
> next redelivery. If we have to shout down our container (which is
> ServiceMix), it will take over one hour or we have to force the shutdown
> and
> we loose the message.
> 
> 
> To solve the missing feature, we have at least two options:
> 
> 
> a) Provide a "special" error handler which uses the "delay and schedule
> message delivery" feature from ActiveMQ [3]. This error handler
> should/could
> looks like the RedeliveryErrorHandler:
> errorHandler(
>   activeMqDeadLetterChannel(activemq:queue:foo.DLQ)
>   .maximumRedeliveries(8)
>   .deliveryDelay(60000)
>   .useExponentialBackOff()
>   .backOffMultiplier(2));
> 
> from(activemq:queue:foo)
>   .to("cxf:bean:orderService")
>   .to("activemq:queue:bar");
> 
> If an error occurs, this error handler should evaluate the (JMS) headers
> whether or not the message should redelivered, which delay should be used,
> update the (JMS) headers with the new values and enqueue the message again
> into the queue the message was read from. May be this route must/should
> also
> be transacted to make sure we do not loose messages.
> 
> 
> b) Add the options described in [1] and [2] to the activemq component to
> allow the user to specify the redelivery behavior per endpoint definition:
> from(activemq:queue:foo?maximumRedeliveries=10&deliveryDelay=60000&useExponentialBackOff=true&backOffMultiplier=2)
>   .transacted("REQUIRED")
>   .to("cxf:bean:orderService")
>   .to("activemq:queue:bar");
> 
> This options should be used to configure the JMSConnection.
> I know it's also possible to configure this in the
> ActiveMQConnectionFactory, but:
> - We are not the "owner" of this configuration. We use an OSGI lookup to
> get
> the reference to the exported ActiveMQConnectionFactory.
> - It's a global configuration and we may need the possibility to override
> some options (deliveryDelay) for other routes.
> 
> 
> Do I miss something? What are your thoughts?
> 
> 
> [1] http://activemq.apache.org/redelivery-policy.html
> [2] http://activemq.apache.org/message-redelivery-and-dlq-handling.html
> [3] http://activemq.apache.org/delay-and-schedule-message-delivery.html
> 
> Looking forward for many opinions :-),
> Christian
> 


-----
Ben O'Day
IT Consultant -http://benoday.blogspot.com

--
View this message in context: http://camel.465427.n5.nabble.com/Missing-feature-to-handle-errors-in-a-route-which-reads-from-an-activemq-destination-tp4416986p4417941.html
Sent from the Camel Development mailing list archive at Nabble.com.

Re: Missing feature to handle errors in a route which reads from an activemq destination

Posted by Christian Müller <ch...@gmail.com>.
Thanks for your detailed answer Claus.
But that's what I mean with "may it works as a work around". Because for
each different ActiveMQComponent we create a new ActiveMQConnectionFactory.
Does this sound right for you? Should we do not support a more elegant and
resource friendly solution? Something like:

from("activemq:queue:foo?redeliveryPolicy=#redeliveryPolicy")

Than we can still stick to have only one ActiveMQConnectionFactory and
configure the ActiveMQConnection with the redelivery policy.

Thanks for sharing your minds,
Christian

Re: Missing feature to handle errors in a route which reads from an activemq destination

Posted by Claus Ibsen <cl...@gmail.com>.
On Sun, Jul 31, 2011 at 7:42 PM, Christian Müller
<ch...@gmail.com> wrote:
> Hello Claus!
>
> As you know, the redelivery logic is part of Apache ActiveMQ, but we cannot
> use it in Camel in the way I would like it. To only have one redelivery
> policy in place is not sufficient for us (if you configure the redelivery
> policy in your ActiveMQConnectionFactory). We have multiple applications
> sharing the same broker with different requirements (online, batch, ...).
>
> I know it's also possible to configure it on the ActiveMQConnection [1], but
> as I know we don't have this possibility in Camel right now. Correct me if
> I'm wrong. May be this is a better solution for my needs, when we have to
> possibilities to configure the redelivery policy as an option in the
> ActiveMqComponent:
>
> from("activemq:queue:foo?redeliveryPolicy=#redeliveryPolicy")...
>
> or
>
> from("activemq:queue:foo?initialRedeliveryDelay=500&maximumRedeliveries=5&...")...
>
> But for this, I think I have to open a JIRA for ActiveMQ, right?
>
> [1] http://activemq.apache.org/message-redelivery-and-dlq-handling.html
>

You can setup a number of activemq components, one for each that has
different redelivery settings
As show here:
http://camel.apache.org/activemq

Then just give the <bean> a different id, and have the brokerURL with
the redelivery settings.

<bean id="activemq" ...>
</bean>

<bean id="activemq-batch" ...>
</bean>

<bean id="activemq-online" ...>
</bean>


Then in the Camel routes you pick the id you want to use

<from uri="activemq-online:queue:foo"/>
...


> Best,
> Christian
>
> On Sun, Jul 31, 2011 at 2:20 PM, Claus Ibsen <cl...@gmail.com> wrote:
>
>> Frankly I thing the redelivery logic should be part of Apache ActiveMQ.
>>
>> Your solution is brittle in the fact that you consume the message, and
>> then send it back to the broker. What happens if you cannot send the
>> message back to the broker?
>> It would be better if the broker handled all this out of the box.
>>
>> ActiveMQ already has redelivery policy where you can configure various
>> delays options such as exponential backoff etc.
>>
>> And there is a JIRA ticket for ActiveMQ to support asynchronous
>> scheduled redeliveries.
>> https://issues.apache.org/jira/browse/AMQ-1853
>> Which has the potential for consumers to pickup the next message, and
>> not block waiting to issue the redelivery.
>>
>>
>>
>> On Sun, Jul 31, 2011 at 1:49 PM, Christian Müller
>> <ch...@gmail.com> wrote:
>> > Hello!
>> >
>> > I didn't get so much responses as I hoped. May it was not clear what I
>> try
>> > to do or what I miss at present in Camel.
>> > Therefore a build a small unit test (and attached it) to demonstrate what
>> > I'm looking for (for convenience I also paste the code into this mail at
>> the
>> > end).
>> >
>> > I build my own (simple) redelivery processor which I use to re-enqueue
>> > messages into ActiveMQ after the default Camel error handler was kicked
>> in
>> > and catched the exception. I use the same logic as Camel does (but much
>> more
>> > simpler for this test) to trace the redelivery delay and count. But
>> instead
>> > to wait for the retry, I put the message message into ActiveMQ for
>> > redelivery and set the "AMQ_SCHEDULED_DELAY" header which is used by
>> > ActiveMQ to delay the delivery of the message:
>> >
>> > public class RedeliveryTest extends CamelTestSupport {
>> >
>> >     @EndpointInject(uri = "mock:intercept")
>> >     private MockEndpoint intercept;
>> >
>> >     @EndpointInject(uri = "mock:dlq")
>> >     private MockEndpoint dlq;
>> >
>> >     @Before
>> >     public void setUp() throws Exception {
>> >         disableJMX();
>> >
>> >         super.setUp();
>> >     }
>> >
>> >     @Test
>> >     public void redelieryTest() throws Exception {
>> >         context.addRoutes(new RouteBuilder() {
>> >             @Override
>> >             public void configure() throws Exception {
>> >                 from("activemq:queue:dlq")
>> >                     .to("mock:dlq");
>> >             }
>> >         });
>> >
>> >         intercept.expectedMessageCount(6); // 1 + 5 redeliveries
>> >         dlq.expectedMessageCount(1);
>> >
>> >         template.sendBody("activemq:queue:start", "Hello Camel!");
>> >
>> >         intercept.assertIsSatisfied();
>> >         dlq.assertIsSatisfied();
>> >
>> >         Exchange exchange = dlq.getExchanges().get(0);
>> >         assertEquals(new Long(5),
>> > exchange.getIn().getHeader("CamelActiveMqRedeliveryCounter"));
>> >         assertEquals(new Long(3200l),
>> > exchange.getIn().getHeader("CamelActiveMqRedeliveryDelay"));
>> >     }
>> >
>> >     @Override
>> >     protected JndiRegistry createRegistry() throws Exception {
>> >         JndiRegistry registry = super.createRegistry();
>> >
>> >         ActiveMQComponent activemq =
>> >
>> ActiveMQComponent.activeMQComponent("vm://localhost?broker.schedulerSupport=true&broker.persistent=false&broker.useJmx=false");
>> >         registry.bind("activemq", activemq);
>> >
>> >         return registry;
>> >     }
>> >
>> >     @Override
>> >     protected RouteBuilder createRouteBuilder() throws Exception {
>> >         return new RouteBuilder() {
>> >             @Override
>> >             public void configure() throws Exception {
>> >                 context.addInterceptStrategy(new Tracer());
>> >
>> >                 ActiveMqRedeliveryProcessor activeMqRedeliveryProcessor =
>> > new ActiveMqRedeliveryProcessor();
>> >
>> >
>> activeMqRedeliveryProcessor.setRedeliveryEndpoint("activemq:queue:start");
>> >
>> > activeMqRedeliveryProcessor.setDeadLetterEndpoint("activemq:queue:dlq");
>> >                 activeMqRedeliveryProcessor.setRedeliveryDelay(200);
>> >                 activeMqRedeliveryProcessor.setBackOffMultiplier(2.0);
>> >                 activeMqRedeliveryProcessor.setMaximumRedeliveries(5);
>> >
>> >                 onException(Exception.class)
>> >                     .handled(true)
>> >                     .bean(activeMqRedeliveryProcessor)
>> >                     .end();
>> >
>> >                 from("activemq:queue:start").routeId("main")
>> >                     .to("mock:intercept")
>> >                     .throwException(new Exception("forced Exception for
>> > test!"));
>> >             }
>> >         };
>> >     }
>> > }
>> >
>> > And the ActiveMqRedeliveryProcessor is:
>> >
>> > public class ActiveMqRedeliveryProcessor {
>> >
>> >     private String redeliveryEndpoint;
>> >     private String deadLetterEndpoint;
>> >
>> >     private long redeliveryDelay = 0l;
>> >     private double backOffMultiplier = 1;
>> >     private int maximumRedeliveries = 0;
>> >
>> >     public void process(Exchange exchange) throws Exception {
>> >         JmsMessage message = exchange.getIn(JmsMessage.class);
>> >
>> >         Long delay = message.getHeader("CamelActiveMqRedeliveryDelay",
>> > Long.class);
>> >         Long redeliveryCount =
>> > message.getHeader("CamelActiveMqRedeliveryCounter", Long.class);
>> >
>> >         if (redeliveryCount == null) {
>> >             redeliveryCount = new Long(0);
>> >         }
>> >
>> >         ProducerTemplate template = new
>> > DefaultProducerTemplate(exchange.getContext());
>> >         template.start();
>> >
>> >         if (redeliveryCount < maximumRedeliveries) {
>> >             redeliveryCount = new Long(redeliveryCount + 1);
>> >             message.setHeader("CamelActiveMqRedeliveryCounter",
>> > redeliveryCount);
>> >
>> >             if (delay == null) {
>> >                 delay = new Long(redeliveryDelay);
>> >             } else {
>> >                 delay = new Long((long) (delay * backOffMultiplier));
>> >             }
>> >             message.setHeader("scheduledJobId", null);
>> >             message.setHeader("CamelActiveMqRedeliveryDelay", delay);
>> >             message.setHeader(ScheduledMessage.AMQ_SCHEDULED_DELAY,
>> delay);
>> >
>> >             template.send(redeliveryEndpoint, exchange);
>> >         } else {
>> >             template.send(deadLetterEndpoint, exchange);
>> >         }
>> >         template.stop();
>> >     }
>> >
>> >     public void setRedeliveryDelay(long redeliveryDelay) {
>> >         this.redeliveryDelay = redeliveryDelay;
>> >     }
>> >
>> >     public void setBackOffMultiplier(double backOffMultiplier) {
>> >         this.backOffMultiplier = backOffMultiplier;
>> >     }
>> >
>> >     public void setMaximumRedeliveries(int maximumRedeliveries) {
>> >         this.maximumRedeliveries = maximumRedeliveries;
>> >     }
>> >
>> >     public void setRedeliveryEndpoint(String redeliveryEndpoint) {
>> >         this.redeliveryEndpoint = redeliveryEndpoint;
>> >     }
>> >
>> >     public void setDeadLetterEndpoint(String deadLetterEndpoint) {
>> >         this.deadLetterEndpoint = deadLetterEndpoint;
>> >     }
>> > }
>> >
>> > I'm looking for a better integration into Camel for this feature, if
>> other
>> > people also think this is a common use case (let ActiveMQ handle the
>> > redelivery instead of having long live inflight messages if the delay is
>> > more than a few minutes). This integration could looks like:
>> >
>> > errorHandler(
>> >   activeMqDeadLetterChannel("activemq:queue:FOO", "activemq:queue:DLQ")
>> >   .maximumRedeliveries(8)
>> >   .deliveryDelay(60000)
>> >   .useExponentialBackOff()
>> >   .backOffMultiplier(2));
>> > Where "activemq:queue:FOO" is the queue for redelivery and
>> > "activemq:queue:DLQ" is the dead letter queue.
>> >
>> > I'm really interested in your opinions whether this is also useful for
>> other
>> > people or is this only a special requirement from my site.
>> >
>> > Best,
>> > Christian
>> >
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> FuseSource
>> Email: cibsen@fusesource.com
>> Web: http://fusesource.com
>> Twitter: davsclaus, fusenews
>> Blog: http://davsclaus.blogspot.com/
>> Author of Camel in Action: http://www.manning.com/ibsen/
>>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: Missing feature to handle errors in a route which reads from an activemq destination

Posted by Christian Müller <ch...@gmail.com>.
For clarification:
We have multiple services/bundles running inside one ServiceMix instance and
sharing the same ActiveMQConnectionFactory which is exported as OSGI service
by the activemq-broker.xml configuration.
We could use a separate ActiveMQConnectionFactory for each service/bundle,
but this looks not as the right way for me. May as a work around until we
have a better solution...

Best,
Christian

On Sun, Jul 31, 2011 at 7:42 PM, Christian Müller <
christian.mueller@gmail.com> wrote:

> Hello Claus!
>
> As you know, the redelivery logic is part of Apache ActiveMQ, but we cannot
> use it in Camel in the way I would like it. To only have one redelivery
> policy in place is not sufficient for us (if you configure the redelivery
> policy in your ActiveMQConnectionFactory). We have multiple applications
> sharing the same broker with different requirements (online, batch, ...).
>
> I know it's also possible to configure it on the ActiveMQConnection [1],
> but as I know we don't have this possibility in Camel right now. Correct me
> if I'm wrong. May be this is a better solution for my needs, when we have to
> possibilities to configure the redelivery policy as an option in the
> ActiveMqComponent:
>
> from("activemq:queue:foo?redeliveryPolicy=#redeliveryPolicy")...
>
> or
>
>
> from("activemq:queue:foo?initialRedeliveryDelay=500&maximumRedeliveries=5&...")...
>
> But for this, I think I have to open a JIRA for ActiveMQ, right?
>
> [1] http://activemq.apache.org/message-redelivery-and-dlq-handling.html
>
> Best,
> Christian
>
>
> On Sun, Jul 31, 2011 at 2:20 PM, Claus Ibsen <cl...@gmail.com>wrote:
>
>> Frankly I thing the redelivery logic should be part of Apache ActiveMQ.
>>
>> Your solution is brittle in the fact that you consume the message, and
>> then send it back to the broker. What happens if you cannot send the
>> message back to the broker?
>> It would be better if the broker handled all this out of the box.
>>
>> ActiveMQ already has redelivery policy where you can configure various
>> delays options such as exponential backoff etc.
>>
>> And there is a JIRA ticket for ActiveMQ to support asynchronous
>> scheduled redeliveries.
>> https://issues.apache.org/jira/browse/AMQ-1853
>> Which has the potential for consumers to pickup the next message, and
>> not block waiting to issue the redelivery.
>>
>>
>>
>> On Sun, Jul 31, 2011 at 1:49 PM, Christian Müller
>> <ch...@gmail.com> wrote:
>> > Hello!
>> >
>> > I didn't get so much responses as I hoped. May it was not clear what I
>> try
>> > to do or what I miss at present in Camel.
>> > Therefore a build a small unit test (and attached it) to demonstrate
>> what
>> > I'm looking for (for convenience I also paste the code into this mail at
>> the
>> > end).
>> >
>> > I build my own (simple) redelivery processor which I use to re-enqueue
>> > messages into ActiveMQ after the default Camel error handler was kicked
>> in
>> > and catched the exception. I use the same logic as Camel does (but much
>> more
>> > simpler for this test) to trace the redelivery delay and count. But
>> instead
>> > to wait for the retry, I put the message message into ActiveMQ for
>> > redelivery and set the "AMQ_SCHEDULED_DELAY" header which is used by
>> > ActiveMQ to delay the delivery of the message:
>> >
>> > public class RedeliveryTest extends CamelTestSupport {
>> >
>> >     @EndpointInject(uri = "mock:intercept")
>> >     private MockEndpoint intercept;
>> >
>> >     @EndpointInject(uri = "mock:dlq")
>> >     private MockEndpoint dlq;
>> >
>> >     @Before
>> >     public void setUp() throws Exception {
>> >         disableJMX();
>> >
>> >         super.setUp();
>> >     }
>> >
>> >     @Test
>> >     public void redelieryTest() throws Exception {
>> >         context.addRoutes(new RouteBuilder() {
>> >             @Override
>> >             public void configure() throws Exception {
>> >                 from("activemq:queue:dlq")
>> >                     .to("mock:dlq");
>> >             }
>> >         });
>> >
>> >         intercept.expectedMessageCount(6); // 1 + 5 redeliveries
>> >         dlq.expectedMessageCount(1);
>> >
>> >         template.sendBody("activemq:queue:start", "Hello Camel!");
>> >
>> >         intercept.assertIsSatisfied();
>> >         dlq.assertIsSatisfied();
>> >
>> >         Exchange exchange = dlq.getExchanges().get(0);
>> >         assertEquals(new Long(5),
>> > exchange.getIn().getHeader("CamelActiveMqRedeliveryCounter"));
>> >         assertEquals(new Long(3200l),
>> > exchange.getIn().getHeader("CamelActiveMqRedeliveryDelay"));
>> >     }
>> >
>> >     @Override
>> >     protected JndiRegistry createRegistry() throws Exception {
>> >         JndiRegistry registry = super.createRegistry();
>> >
>> >         ActiveMQComponent activemq =
>> >
>> ActiveMQComponent.activeMQComponent("vm://localhost?broker.schedulerSupport=true&broker.persistent=false&broker.useJmx=false");
>> >         registry.bind("activemq", activemq);
>> >
>> >         return registry;
>> >     }
>> >
>> >     @Override
>> >     protected RouteBuilder createRouteBuilder() throws Exception {
>> >         return new RouteBuilder() {
>> >             @Override
>> >             public void configure() throws Exception {
>> >                 context.addInterceptStrategy(new Tracer());
>> >
>> >                 ActiveMqRedeliveryProcessor activeMqRedeliveryProcessor
>> =
>> > new ActiveMqRedeliveryProcessor();
>> >
>> >
>> activeMqRedeliveryProcessor.setRedeliveryEndpoint("activemq:queue:start");
>> >
>> > activeMqRedeliveryProcessor.setDeadLetterEndpoint("activemq:queue:dlq");
>> >                 activeMqRedeliveryProcessor.setRedeliveryDelay(200);
>> >                 activeMqRedeliveryProcessor.setBackOffMultiplier(2.0);
>> >                 activeMqRedeliveryProcessor.setMaximumRedeliveries(5);
>> >
>> >                 onException(Exception.class)
>> >                     .handled(true)
>> >                     .bean(activeMqRedeliveryProcessor)
>> >                     .end();
>> >
>> >                 from("activemq:queue:start").routeId("main")
>> >                     .to("mock:intercept")
>> >                     .throwException(new Exception("forced Exception for
>> > test!"));
>> >             }
>> >         };
>> >     }
>> > }
>> >
>> > And the ActiveMqRedeliveryProcessor is:
>> >
>> > public class ActiveMqRedeliveryProcessor {
>> >
>> >     private String redeliveryEndpoint;
>> >     private String deadLetterEndpoint;
>> >
>> >     private long redeliveryDelay = 0l;
>> >     private double backOffMultiplier = 1;
>> >     private int maximumRedeliveries = 0;
>> >
>> >     public void process(Exchange exchange) throws Exception {
>> >         JmsMessage message = exchange.getIn(JmsMessage.class);
>> >
>> >         Long delay = message.getHeader("CamelActiveMqRedeliveryDelay",
>> > Long.class);
>> >         Long redeliveryCount =
>> > message.getHeader("CamelActiveMqRedeliveryCounter", Long.class);
>> >
>> >         if (redeliveryCount == null) {
>> >             redeliveryCount = new Long(0);
>> >         }
>> >
>> >         ProducerTemplate template = new
>> > DefaultProducerTemplate(exchange.getContext());
>> >         template.start();
>> >
>> >         if (redeliveryCount < maximumRedeliveries) {
>> >             redeliveryCount = new Long(redeliveryCount + 1);
>> >             message.setHeader("CamelActiveMqRedeliveryCounter",
>> > redeliveryCount);
>> >
>> >             if (delay == null) {
>> >                 delay = new Long(redeliveryDelay);
>> >             } else {
>> >                 delay = new Long((long) (delay * backOffMultiplier));
>> >             }
>> >             message.setHeader("scheduledJobId", null);
>> >             message.setHeader("CamelActiveMqRedeliveryDelay", delay);
>> >             message.setHeader(ScheduledMessage.AMQ_SCHEDULED_DELAY,
>> delay);
>> >
>> >             template.send(redeliveryEndpoint, exchange);
>> >         } else {
>> >             template.send(deadLetterEndpoint, exchange);
>> >         }
>> >         template.stop();
>> >     }
>> >
>> >     public void setRedeliveryDelay(long redeliveryDelay) {
>> >         this.redeliveryDelay = redeliveryDelay;
>> >     }
>> >
>> >     public void setBackOffMultiplier(double backOffMultiplier) {
>> >         this.backOffMultiplier = backOffMultiplier;
>> >     }
>> >
>> >     public void setMaximumRedeliveries(int maximumRedeliveries) {
>> >         this.maximumRedeliveries = maximumRedeliveries;
>> >     }
>> >
>> >     public void setRedeliveryEndpoint(String redeliveryEndpoint) {
>> >         this.redeliveryEndpoint = redeliveryEndpoint;
>> >     }
>> >
>> >     public void setDeadLetterEndpoint(String deadLetterEndpoint) {
>> >         this.deadLetterEndpoint = deadLetterEndpoint;
>> >     }
>> > }
>> >
>> > I'm looking for a better integration into Camel for this feature, if
>> other
>> > people also think this is a common use case (let ActiveMQ handle the
>> > redelivery instead of having long live inflight messages if the delay is
>> > more than a few minutes). This integration could looks like:
>> >
>> > errorHandler(
>> >   activeMqDeadLetterChannel("activemq:queue:FOO", "activemq:queue:DLQ")
>> >   .maximumRedeliveries(8)
>> >   .deliveryDelay(60000)
>> >   .useExponentialBackOff()
>> >   .backOffMultiplier(2));
>> > Where "activemq:queue:FOO" is the queue for redelivery and
>> > "activemq:queue:DLQ" is the dead letter queue.
>> >
>> > I'm really interested in your opinions whether this is also useful for
>> other
>> > people or is this only a special requirement from my site.
>> >
>> > Best,
>> > Christian
>> >
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> FuseSource
>> Email: cibsen@fusesource.com
>> Web: http://fusesource.com
>> Twitter: davsclaus, fusenews
>> Blog: http://davsclaus.blogspot.com/
>> Author of Camel in Action: http://www.manning.com/ibsen/
>>
>
>

Re: Missing feature to handle errors in a route which reads from an activemq destination

Posted by Christian Müller <ch...@gmail.com>.
Hello Claus!

As you know, the redelivery logic is part of Apache ActiveMQ, but we cannot
use it in Camel in the way I would like it. To only have one redelivery
policy in place is not sufficient for us (if you configure the redelivery
policy in your ActiveMQConnectionFactory). We have multiple applications
sharing the same broker with different requirements (online, batch, ...).

I know it's also possible to configure it on the ActiveMQConnection [1], but
as I know we don't have this possibility in Camel right now. Correct me if
I'm wrong. May be this is a better solution for my needs, when we have to
possibilities to configure the redelivery policy as an option in the
ActiveMqComponent:

from("activemq:queue:foo?redeliveryPolicy=#redeliveryPolicy")...

or

from("activemq:queue:foo?initialRedeliveryDelay=500&maximumRedeliveries=5&...")...

But for this, I think I have to open a JIRA for ActiveMQ, right?

[1] http://activemq.apache.org/message-redelivery-and-dlq-handling.html

Best,
Christian

On Sun, Jul 31, 2011 at 2:20 PM, Claus Ibsen <cl...@gmail.com> wrote:

> Frankly I thing the redelivery logic should be part of Apache ActiveMQ.
>
> Your solution is brittle in the fact that you consume the message, and
> then send it back to the broker. What happens if you cannot send the
> message back to the broker?
> It would be better if the broker handled all this out of the box.
>
> ActiveMQ already has redelivery policy where you can configure various
> delays options such as exponential backoff etc.
>
> And there is a JIRA ticket for ActiveMQ to support asynchronous
> scheduled redeliveries.
> https://issues.apache.org/jira/browse/AMQ-1853
> Which has the potential for consumers to pickup the next message, and
> not block waiting to issue the redelivery.
>
>
>
> On Sun, Jul 31, 2011 at 1:49 PM, Christian Müller
> <ch...@gmail.com> wrote:
> > Hello!
> >
> > I didn't get so much responses as I hoped. May it was not clear what I
> try
> > to do or what I miss at present in Camel.
> > Therefore a build a small unit test (and attached it) to demonstrate what
> > I'm looking for (for convenience I also paste the code into this mail at
> the
> > end).
> >
> > I build my own (simple) redelivery processor which I use to re-enqueue
> > messages into ActiveMQ after the default Camel error handler was kicked
> in
> > and catched the exception. I use the same logic as Camel does (but much
> more
> > simpler for this test) to trace the redelivery delay and count. But
> instead
> > to wait for the retry, I put the message message into ActiveMQ for
> > redelivery and set the "AMQ_SCHEDULED_DELAY" header which is used by
> > ActiveMQ to delay the delivery of the message:
> >
> > public class RedeliveryTest extends CamelTestSupport {
> >
> >     @EndpointInject(uri = "mock:intercept")
> >     private MockEndpoint intercept;
> >
> >     @EndpointInject(uri = "mock:dlq")
> >     private MockEndpoint dlq;
> >
> >     @Before
> >     public void setUp() throws Exception {
> >         disableJMX();
> >
> >         super.setUp();
> >     }
> >
> >     @Test
> >     public void redelieryTest() throws Exception {
> >         context.addRoutes(new RouteBuilder() {
> >             @Override
> >             public void configure() throws Exception {
> >                 from("activemq:queue:dlq")
> >                     .to("mock:dlq");
> >             }
> >         });
> >
> >         intercept.expectedMessageCount(6); // 1 + 5 redeliveries
> >         dlq.expectedMessageCount(1);
> >
> >         template.sendBody("activemq:queue:start", "Hello Camel!");
> >
> >         intercept.assertIsSatisfied();
> >         dlq.assertIsSatisfied();
> >
> >         Exchange exchange = dlq.getExchanges().get(0);
> >         assertEquals(new Long(5),
> > exchange.getIn().getHeader("CamelActiveMqRedeliveryCounter"));
> >         assertEquals(new Long(3200l),
> > exchange.getIn().getHeader("CamelActiveMqRedeliveryDelay"));
> >     }
> >
> >     @Override
> >     protected JndiRegistry createRegistry() throws Exception {
> >         JndiRegistry registry = super.createRegistry();
> >
> >         ActiveMQComponent activemq =
> >
> ActiveMQComponent.activeMQComponent("vm://localhost?broker.schedulerSupport=true&broker.persistent=false&broker.useJmx=false");
> >         registry.bind("activemq", activemq);
> >
> >         return registry;
> >     }
> >
> >     @Override
> >     protected RouteBuilder createRouteBuilder() throws Exception {
> >         return new RouteBuilder() {
> >             @Override
> >             public void configure() throws Exception {
> >                 context.addInterceptStrategy(new Tracer());
> >
> >                 ActiveMqRedeliveryProcessor activeMqRedeliveryProcessor =
> > new ActiveMqRedeliveryProcessor();
> >
> >
> activeMqRedeliveryProcessor.setRedeliveryEndpoint("activemq:queue:start");
> >
> > activeMqRedeliveryProcessor.setDeadLetterEndpoint("activemq:queue:dlq");
> >                 activeMqRedeliveryProcessor.setRedeliveryDelay(200);
> >                 activeMqRedeliveryProcessor.setBackOffMultiplier(2.0);
> >                 activeMqRedeliveryProcessor.setMaximumRedeliveries(5);
> >
> >                 onException(Exception.class)
> >                     .handled(true)
> >                     .bean(activeMqRedeliveryProcessor)
> >                     .end();
> >
> >                 from("activemq:queue:start").routeId("main")
> >                     .to("mock:intercept")
> >                     .throwException(new Exception("forced Exception for
> > test!"));
> >             }
> >         };
> >     }
> > }
> >
> > And the ActiveMqRedeliveryProcessor is:
> >
> > public class ActiveMqRedeliveryProcessor {
> >
> >     private String redeliveryEndpoint;
> >     private String deadLetterEndpoint;
> >
> >     private long redeliveryDelay = 0l;
> >     private double backOffMultiplier = 1;
> >     private int maximumRedeliveries = 0;
> >
> >     public void process(Exchange exchange) throws Exception {
> >         JmsMessage message = exchange.getIn(JmsMessage.class);
> >
> >         Long delay = message.getHeader("CamelActiveMqRedeliveryDelay",
> > Long.class);
> >         Long redeliveryCount =
> > message.getHeader("CamelActiveMqRedeliveryCounter", Long.class);
> >
> >         if (redeliveryCount == null) {
> >             redeliveryCount = new Long(0);
> >         }
> >
> >         ProducerTemplate template = new
> > DefaultProducerTemplate(exchange.getContext());
> >         template.start();
> >
> >         if (redeliveryCount < maximumRedeliveries) {
> >             redeliveryCount = new Long(redeliveryCount + 1);
> >             message.setHeader("CamelActiveMqRedeliveryCounter",
> > redeliveryCount);
> >
> >             if (delay == null) {
> >                 delay = new Long(redeliveryDelay);
> >             } else {
> >                 delay = new Long((long) (delay * backOffMultiplier));
> >             }
> >             message.setHeader("scheduledJobId", null);
> >             message.setHeader("CamelActiveMqRedeliveryDelay", delay);
> >             message.setHeader(ScheduledMessage.AMQ_SCHEDULED_DELAY,
> delay);
> >
> >             template.send(redeliveryEndpoint, exchange);
> >         } else {
> >             template.send(deadLetterEndpoint, exchange);
> >         }
> >         template.stop();
> >     }
> >
> >     public void setRedeliveryDelay(long redeliveryDelay) {
> >         this.redeliveryDelay = redeliveryDelay;
> >     }
> >
> >     public void setBackOffMultiplier(double backOffMultiplier) {
> >         this.backOffMultiplier = backOffMultiplier;
> >     }
> >
> >     public void setMaximumRedeliveries(int maximumRedeliveries) {
> >         this.maximumRedeliveries = maximumRedeliveries;
> >     }
> >
> >     public void setRedeliveryEndpoint(String redeliveryEndpoint) {
> >         this.redeliveryEndpoint = redeliveryEndpoint;
> >     }
> >
> >     public void setDeadLetterEndpoint(String deadLetterEndpoint) {
> >         this.deadLetterEndpoint = deadLetterEndpoint;
> >     }
> > }
> >
> > I'm looking for a better integration into Camel for this feature, if
> other
> > people also think this is a common use case (let ActiveMQ handle the
> > redelivery instead of having long live inflight messages if the delay is
> > more than a few minutes). This integration could looks like:
> >
> > errorHandler(
> >   activeMqDeadLetterChannel("activemq:queue:FOO", "activemq:queue:DLQ")
> >   .maximumRedeliveries(8)
> >   .deliveryDelay(60000)
> >   .useExponentialBackOff()
> >   .backOffMultiplier(2));
> > Where "activemq:queue:FOO" is the queue for redelivery and
> > "activemq:queue:DLQ" is the dead letter queue.
> >
> > I'm really interested in your opinions whether this is also useful for
> other
> > people or is this only a special requirement from my site.
> >
> > Best,
> > Christian
> >
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cibsen@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/
>

Re: Missing feature to handle errors in a route which reads from an activemq destination

Posted by Claus Ibsen <cl...@gmail.com>.
Frankly I thing the redelivery logic should be part of Apache ActiveMQ.

Your solution is brittle in the fact that you consume the message, and
then send it back to the broker. What happens if you cannot send the
message back to the broker?
It would be better if the broker handled all this out of the box.

ActiveMQ already has redelivery policy where you can configure various
delays options such as exponential backoff etc.

And there is a JIRA ticket for ActiveMQ to support asynchronous
scheduled redeliveries.
https://issues.apache.org/jira/browse/AMQ-1853
Which has the potential for consumers to pickup the next message, and
not block waiting to issue the redelivery.



On Sun, Jul 31, 2011 at 1:49 PM, Christian Müller
<ch...@gmail.com> wrote:
> Hello!
>
> I didn't get so much responses as I hoped. May it was not clear what I try
> to do or what I miss at present in Camel.
> Therefore a build a small unit test (and attached it) to demonstrate what
> I'm looking for (for convenience I also paste the code into this mail at the
> end).
>
> I build my own (simple) redelivery processor which I use to re-enqueue
> messages into ActiveMQ after the default Camel error handler was kicked in
> and catched the exception. I use the same logic as Camel does (but much more
> simpler for this test) to trace the redelivery delay and count. But instead
> to wait for the retry, I put the message message into ActiveMQ for
> redelivery and set the "AMQ_SCHEDULED_DELAY" header which is used by
> ActiveMQ to delay the delivery of the message:
>
> public class RedeliveryTest extends CamelTestSupport {
>
>     @EndpointInject(uri = "mock:intercept")
>     private MockEndpoint intercept;
>
>     @EndpointInject(uri = "mock:dlq")
>     private MockEndpoint dlq;
>
>     @Before
>     public void setUp() throws Exception {
>         disableJMX();
>
>         super.setUp();
>     }
>
>     @Test
>     public void redelieryTest() throws Exception {
>         context.addRoutes(new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                 from("activemq:queue:dlq")
>                     .to("mock:dlq");
>             }
>         });
>
>         intercept.expectedMessageCount(6); // 1 + 5 redeliveries
>         dlq.expectedMessageCount(1);
>
>         template.sendBody("activemq:queue:start", "Hello Camel!");
>
>         intercept.assertIsSatisfied();
>         dlq.assertIsSatisfied();
>
>         Exchange exchange = dlq.getExchanges().get(0);
>         assertEquals(new Long(5),
> exchange.getIn().getHeader("CamelActiveMqRedeliveryCounter"));
>         assertEquals(new Long(3200l),
> exchange.getIn().getHeader("CamelActiveMqRedeliveryDelay"));
>     }
>
>     @Override
>     protected JndiRegistry createRegistry() throws Exception {
>         JndiRegistry registry = super.createRegistry();
>
>         ActiveMQComponent activemq =
> ActiveMQComponent.activeMQComponent("vm://localhost?broker.schedulerSupport=true&broker.persistent=false&broker.useJmx=false");
>         registry.bind("activemq", activemq);
>
>         return registry;
>     }
>
>     @Override
>     protected RouteBuilder createRouteBuilder() throws Exception {
>         return new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                 context.addInterceptStrategy(new Tracer());
>
>                 ActiveMqRedeliveryProcessor activeMqRedeliveryProcessor =
> new ActiveMqRedeliveryProcessor();
>
> activeMqRedeliveryProcessor.setRedeliveryEndpoint("activemq:queue:start");
>
> activeMqRedeliveryProcessor.setDeadLetterEndpoint("activemq:queue:dlq");
>                 activeMqRedeliveryProcessor.setRedeliveryDelay(200);
>                 activeMqRedeliveryProcessor.setBackOffMultiplier(2.0);
>                 activeMqRedeliveryProcessor.setMaximumRedeliveries(5);
>
>                 onException(Exception.class)
>                     .handled(true)
>                     .bean(activeMqRedeliveryProcessor)
>                     .end();
>
>                 from("activemq:queue:start").routeId("main")
>                     .to("mock:intercept")
>                     .throwException(new Exception("forced Exception for
> test!"));
>             }
>         };
>     }
> }
>
> And the ActiveMqRedeliveryProcessor is:
>
> public class ActiveMqRedeliveryProcessor {
>
>     private String redeliveryEndpoint;
>     private String deadLetterEndpoint;
>
>     private long redeliveryDelay = 0l;
>     private double backOffMultiplier = 1;
>     private int maximumRedeliveries = 0;
>
>     public void process(Exchange exchange) throws Exception {
>         JmsMessage message = exchange.getIn(JmsMessage.class);
>
>         Long delay = message.getHeader("CamelActiveMqRedeliveryDelay",
> Long.class);
>         Long redeliveryCount =
> message.getHeader("CamelActiveMqRedeliveryCounter", Long.class);
>
>         if (redeliveryCount == null) {
>             redeliveryCount = new Long(0);
>         }
>
>         ProducerTemplate template = new
> DefaultProducerTemplate(exchange.getContext());
>         template.start();
>
>         if (redeliveryCount < maximumRedeliveries) {
>             redeliveryCount = new Long(redeliveryCount + 1);
>             message.setHeader("CamelActiveMqRedeliveryCounter",
> redeliveryCount);
>
>             if (delay == null) {
>                 delay = new Long(redeliveryDelay);
>             } else {
>                 delay = new Long((long) (delay * backOffMultiplier));
>             }
>             message.setHeader("scheduledJobId", null);
>             message.setHeader("CamelActiveMqRedeliveryDelay", delay);
>             message.setHeader(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
>
>             template.send(redeliveryEndpoint, exchange);
>         } else {
>             template.send(deadLetterEndpoint, exchange);
>         }
>         template.stop();
>     }
>
>     public void setRedeliveryDelay(long redeliveryDelay) {
>         this.redeliveryDelay = redeliveryDelay;
>     }
>
>     public void setBackOffMultiplier(double backOffMultiplier) {
>         this.backOffMultiplier = backOffMultiplier;
>     }
>
>     public void setMaximumRedeliveries(int maximumRedeliveries) {
>         this.maximumRedeliveries = maximumRedeliveries;
>     }
>
>     public void setRedeliveryEndpoint(String redeliveryEndpoint) {
>         this.redeliveryEndpoint = redeliveryEndpoint;
>     }
>
>     public void setDeadLetterEndpoint(String deadLetterEndpoint) {
>         this.deadLetterEndpoint = deadLetterEndpoint;
>     }
> }
>
> I'm looking for a better integration into Camel for this feature, if other
> people also think this is a common use case (let ActiveMQ handle the
> redelivery instead of having long live inflight messages if the delay is
> more than a few minutes). This integration could looks like:
>
> errorHandler(
>   activeMqDeadLetterChannel("activemq:queue:FOO", "activemq:queue:DLQ")
>   .maximumRedeliveries(8)
>   .deliveryDelay(60000)
>   .useExponentialBackOff()
>   .backOffMultiplier(2));
> Where "activemq:queue:FOO" is the queue for redelivery and
> "activemq:queue:DLQ" is the dead letter queue.
>
> I'm really interested in your opinions whether this is also useful for other
> people or is this only a special requirement from my site.
>
> Best,
> Christian
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: Missing feature to handle errors in a route which reads from an activemq destination

Posted by Christian Müller <ch...@gmail.com>.
Hello!

I didn't get so much responses as I hoped. May it was not clear what I try
to do or what I miss at present in Camel.
Therefore a build a small unit test (and attached it) to demonstrate what
I'm looking for (for convenience I also paste the code into this mail at the
end).

I build my own (simple) redelivery processor which I use to re-enqueue
messages into ActiveMQ after the default Camel error handler was kicked in
and catched the exception. I use the same logic as Camel does (but much more
simpler for this test) to trace the redelivery delay and count. But instead
to wait for the retry, I put the message message into ActiveMQ for
redelivery and set the "AMQ_SCHEDULED_DELAY" header which is used by
ActiveMQ to delay the delivery of the message:

public class RedeliveryTest extends CamelTestSupport {

    @EndpointInject(uri = "mock:intercept")
    private MockEndpoint intercept;

    @EndpointInject(uri = "mock:dlq")
    private MockEndpoint dlq;

    @Before
    public void setUp() throws Exception {
        disableJMX();

        super.setUp();
    }

    @Test
    public void redelieryTest() throws Exception {
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("activemq:queue:dlq")
                    .to("mock:dlq");
            }
        });

        intercept.expectedMessageCount(6); // 1 + 5 redeliveries
        dlq.expectedMessageCount(1);

        template.sendBody("activemq:queue:start", "Hello Camel!");

        intercept.assertIsSatisfied();
        dlq.assertIsSatisfied();

        Exchange exchange = dlq.getExchanges().get(0);
        assertEquals(new Long(5),
exchange.getIn().getHeader("CamelActiveMqRedeliveryCounter"));
        assertEquals(new Long(3200l),
exchange.getIn().getHeader("CamelActiveMqRedeliveryDelay"));
    }

    @Override
    protected JndiRegistry createRegistry() throws Exception {
        JndiRegistry registry = super.createRegistry();

        ActiveMQComponent activemq =
ActiveMQComponent.activeMQComponent("vm://localhost?broker.schedulerSupport=true&broker.persistent=false&broker.useJmx=false");
        registry.bind("activemq", activemq);

        return registry;
    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                context.addInterceptStrategy(new Tracer());

                ActiveMqRedeliveryProcessor activeMqRedeliveryProcessor =
new ActiveMqRedeliveryProcessor();

activeMqRedeliveryProcessor.setRedeliveryEndpoint("activemq:queue:start");

activeMqRedeliveryProcessor.setDeadLetterEndpoint("activemq:queue:dlq");
                activeMqRedeliveryProcessor.setRedeliveryDelay(200);
                activeMqRedeliveryProcessor.setBackOffMultiplier(2.0);
                activeMqRedeliveryProcessor.setMaximumRedeliveries(5);

                onException(Exception.class)
                    .handled(true)
                    .bean(activeMqRedeliveryProcessor)
                    .end();

                from("activemq:queue:start").routeId("main")
                    .to("mock:intercept")
                    .throwException(new Exception("forced Exception for
test!"));
            }
        };
    }
}

And the ActiveMqRedeliveryProcessor is:

public class ActiveMqRedeliveryProcessor {

    private String redeliveryEndpoint;
    private String deadLetterEndpoint;

    private long redeliveryDelay = 0l;
    private double backOffMultiplier = 1;
    private int maximumRedeliveries = 0;

    public void process(Exchange exchange) throws Exception {
        JmsMessage message = exchange.getIn(JmsMessage.class);

        Long delay = message.getHeader("CamelActiveMqRedeliveryDelay",
Long.class);
        Long redeliveryCount =
message.getHeader("CamelActiveMqRedeliveryCounter", Long.class);

        if (redeliveryCount == null) {
            redeliveryCount = new Long(0);
        }

        ProducerTemplate template = new
DefaultProducerTemplate(exchange.getContext());
        template.start();

        if (redeliveryCount < maximumRedeliveries) {
            redeliveryCount = new Long(redeliveryCount + 1);
            message.setHeader("CamelActiveMqRedeliveryCounter",
redeliveryCount);

            if (delay == null) {
                delay = new Long(redeliveryDelay);
            } else {
                delay = new Long((long) (delay * backOffMultiplier));
            }
            message.setHeader("scheduledJobId", null);
            message.setHeader("CamelActiveMqRedeliveryDelay", delay);
            message.setHeader(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);

            template.send(redeliveryEndpoint, exchange);
        } else {
            template.send(deadLetterEndpoint, exchange);
        }
        template.stop();
    }

    public void setRedeliveryDelay(long redeliveryDelay) {
        this.redeliveryDelay = redeliveryDelay;
    }

    public void setBackOffMultiplier(double backOffMultiplier) {
        this.backOffMultiplier = backOffMultiplier;
    }

    public void setMaximumRedeliveries(int maximumRedeliveries) {
        this.maximumRedeliveries = maximumRedeliveries;
    }

    public void setRedeliveryEndpoint(String redeliveryEndpoint) {
        this.redeliveryEndpoint = redeliveryEndpoint;
    }

    public void setDeadLetterEndpoint(String deadLetterEndpoint) {
        this.deadLetterEndpoint = deadLetterEndpoint;
    }
}

I'm looking for a better integration into Camel for this feature, if other
people also think this is a common use case (let ActiveMQ handle the
redelivery instead of having long live inflight messages if the delay is
more than a few minutes). This integration could looks like:

errorHandler(
  activeMqDeadLetterChannel("activemq:queue:FOO", "activemq:queue:DLQ")
  .maximumRedeliveries(8)
  .deliveryDelay(60000)
  .useExponentialBackOff()
  .backOffMultiplier(2));

Where "activemq:queue:FOO" is the queue for redelivery and
"activemq:queue:DLQ" is the dead letter queue.

I'm really interested in your opinions whether this is also useful for other
people or is this only a special requirement from my site.

Best,
Christian

Re: Missing feature to handle errors in a route which reads from an activemq destination

Posted by Christian Müller <ch...@gmail.com>.
Hello guys and thank you for your answers/opinions!

Sorry for my late reply, but I was busy with some other tasks. But I didn't
forgot you... ;-)

@Ben: Looks like we share the same thoughts. I think your suggestion is
really simple (I like simple solutions) and works for most of our
requirements. But I also see some improvements with my suggested solution:
- I would like to have the ability of an exponential back of delay per
message (make the first retry after a few secons, the second after one
minute, ...). One of our services has a really high throughput and we have
to process the messages as early as possible when the back end system is
available again.
- If we use this pattern in multiple services, I think it's a good idea to
generalize this and make it available without the need of the error route
duplication for each route which has this requirement.

@Willem: If I understood you correct, you suggest something like this:
<bean id="activemq1"
class="org.apache.activemq.camel.component.ActiveMQComponent">
  <property name="connectionFactory" ref="connectionFactory1" />
</bean>

<bean id="connectionFactory1"
class="org.apache.activemq.pool.PooledConnectionFactory">
  <constructor-arg
value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=10&amp;jms.redeliveryPolicy.deliveryDelay=60000&amp;jms.redeliveryPolicy.useExponentialBackOff=true&amp;jms.redeliveryPolicy.backOffMultiplier=2"/>
  <!-- some other properties -->
</bean>

<bean id="activemq2"
class="org.apache.activemq.camel.component.ActiveMQComponent">
  <property name="connectionFactory" ref="connectionFactory2" />
</bean>

<bean id="connectionFactory2"
class="org.apache.activemq.pool.PooledConnectionFactory">
  <constructor-arg
value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=5&amp;jms.redeliveryPolicy.deliveryDelay=1000&amp;jms.redeliveryPolicy.useExponentialBackOff=true&amp;jms.redeliveryPolicy.backOffMultiplier=5"/>
  <!-- some other properties -->
</bean>

from(activemq1:queue:foo) // configured this ActiveMQConnectionFactory to
  .transacted("REQUIRED")
  .to("cxf:bean:orderService")
  .to("activemq1:queue:bar");

from(activemq2:queue:bar) // configured this ActiveMQConnectionFactory to
  .transacted("REQUIRED")
  .to("cxf:bean:orderService")
  .to("activemq2:queue:baz");

What I dislike on this solution is, that we have to create a separate
ActiveMQConnectionFactory for each service which use a different redelivery
policy. At present, we use an OSGI lookup in SMX to get the
ActiveMQConnectionFactory which is exported by SMX (from
activemq-broker.xml). I think this is more resource friendly...
But I will take this into account as a good alternative until we have a
solution like the one I suggested... :-)

@Charles: I hope we will find the time to discuss this in 3.5 weeks, if you
are here in Frankfurt with us - or two weeks before if you plan to attend
the FUSE Community day in Frankfurt...
If I understood you correct, you suggest the following:
public void configure() throws Exception {
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setMaximumRedeliveries(10);
    redeliveryPolicy.setRedeliveryDelay(60000l);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setBackOffMultiplier(2.0);

    TransactionErrorHandlerBuilder errorHandlerBuilder = new
TransactionErrorHandlerBuilder();
    errorHandlerBuilder.setRedeliveryPolicy(redeliveryPolicy);

    errorHandler(errorHandlerBuilder);

    from("activemq:queue:foo")
        .transacted("REQUIRED")
        .to("cxf:bean:orderService")
        .to("activemq:queue:bar");
}
I'm not sure I understood how the TransactionErrorHandler works exactly. If
we configure the redeliveryDelay (and all the other options), what happens
under the cover? Does the TransactionErrorHandler waits this time until it
propagates the exception to the TransactionManager? If this is the case, I
have the same bad feelings as in the "normal" RedeliveryErrorHandler I
explained in my first post.


So I still think the new "delay and schedule message delivery" [1] from
ActiveMQ 5.4 brings a good feature we should support in Camel. I will work
out a simple example which hopefully convince you. ;-)

[1] http://activemq.apache.org/delay-and-schedule-message-delivery.html

Best,
Christian

Re: Missing feature to handle errors in a route which reads from an activemq destination

Posted by Charles Moulliard <cm...@gmail.com>.
Hi Christian,

Why don't you simply define the redelivery policy on the
TransactionErrorHandler like that :

    <!-- use a transaction error handler -->
    <bean id="myErrorHandler"
class="org.apache.camel.spring.spi.TransactionErrorHandlerBuilder">
        <property name="redeliveryPolicy" ref="myPolicy"/>
    </bean>
    <!-- and let it try to redeliver up till 4 times -->
    <bean id="myPolicy" class="org.apache.camel.processor.RedeliveryPolicy">
        <property name="maximumRedeliveries" value="4"/>
    </bean>

    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <!-- disable JMX during testing -->
        <jmxAgent id="agent" disabled="true"/>
        <route errorHandlerRef="myErrorHandler">
            <!-- 1: from the jms queue -->
            <from uri="activemq:queue:okay"/>
            <!-- 2: mark this route as transacted -->
            <transacted/>
            <!-- 3: call our business logic that is myProcessor -->
            <process ref="myProcessor"/>
            <!-- 4: if success then send it to the mock -->
            <to uri="mock:result"/>
        </route>
    </camelContext>

Regards,

Charles Moulliard

Sr. Principal Solution Architect - FuseSource
Apache Committer

Blog : http://cmoulliard.blogspot.com
Twitter : http://twitter.com/cmoulliard
Linkedin : http://www.linkedin.com/in/charlesmoulliard
Skype: cmoulliard



On Sun, May 22, 2011 at 6:17 PM, Christian Müller
<ch...@gmail.com> wrote:
> Hello!
>
> To illustrate the missing feature, assume we have the following route:
> from(activemq:queue:foo)
>  .to("cxf:bean:orderService")
>  .to("activemq:queue:bar");
>
> Our orderService is out of order every day for 30 min - 60 min. In this
> time, we cannot process messages from the foo queue. We have to delay the
> processing until the orderService returns to operate normal. We have
> multiple possibilities at present to handle the requirement, but all have
> some drawbacks, IMO.
>
> 1) Make the route transacted and let the ActiveMQ Broker do the (redelivery)
> work.
> from(activemq:queue:foo)
>  .transacted("REQUIRED")
>  .to("cxf:bean:orderService")
>  .to("activemq:queue:bar");
>
> Drawbacks:
> - We are not the owner of this configuration cannot ensure the ActiveMQ
> broker/JMSConnectionFactory is configured properly for the redelivery.
> - We cannot use different redelivery policies for different routes.
>
>
> 2) Use the Camel dead letter error handler to catch the errors and do the
> redelivery.
> errorHandler(
>  deadLetterChannel(activemq:queue:foo.DLQ)
>  .maximumRedeliveries(8)
>  .deliveryDelay(60000)
>  .useExponentialBackOff()
>  .backOffMultiplier(2));
>
> from(activemq:queue:foo)
>  .to("cxf:bean:orderService")
>  .to("activemq:queue:bar");
>
> Drawbacks:
> - When we are in the seventh redelivery, we will wait over one hour for the
> next redelivery. If we have to shout down our container (which is
> ServiceMix), it will take over one hour or we have to force the shutdown and
> we loose the message.
>
>
> To solve the missing feature, we have at least two options:
>
>
> a) Provide a "special" error handler which uses the "delay and schedule
> message delivery" feature from ActiveMQ [3]. This error handler should/could
> looks like the RedeliveryErrorHandler:
> errorHandler(
>  activeMqDeadLetterChannel(activemq:queue:foo.DLQ)
>  .maximumRedeliveries(8)
>  .deliveryDelay(60000)
>  .useExponentialBackOff()
>  .backOffMultiplier(2));
>
> from(activemq:queue:foo)
>  .to("cxf:bean:orderService")
>  .to("activemq:queue:bar");
>
> If an error occurs, this error handler should evaluate the (JMS) headers
> whether or not the message should redelivered, which delay should be used,
> update the (JMS) headers with the new values and enqueue the message again
> into the queue the message was read from. May be this route must/should also
> be transacted to make sure we do not loose messages.
>
>
> b) Add the options described in [1] and [2] to the activemq component to
> allow the user to specify the redelivery behavior per endpoint definition:
> from(activemq:queue:foo?maximumRedeliveries=10&deliveryDelay=60000&useExponentialBackOff=true&backOffMultiplier=2)
>  .transacted("REQUIRED")
>  .to("cxf:bean:orderService")
>  .to("activemq:queue:bar");
>
> This options should be used to configure the JMSConnection.
> I know it's also possible to configure this in the
> ActiveMQConnectionFactory, but:
> - We are not the "owner" of this configuration. We use an OSGI lookup to get
> the reference to the exported ActiveMQConnectionFactory.
> - It's a global configuration and we may need the possibility to override
> some options (deliveryDelay) for other routes.
>
>
> Do I miss something? What are your thoughts?
>
>
> [1] http://activemq.apache.org/redelivery-policy.html
> [2] http://activemq.apache.org/message-redelivery-and-dlq-handling.html
> [3] http://activemq.apache.org/delay-and-schedule-message-delivery.html
>
> Looking forward for many opinions :-),
> Christian
>