You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by Christian Müller <ch...@gmail.com> on 2011/06/23 13:36:10 UTC

Re: Missing feature to handle errors in a route which reads from an activemq destination

Hello guys and thank you for your answers/opinions!

Sorry for my late reply, but I was busy with some other tasks. But I didn't
forgot you... ;-)

@Ben: Looks like we share the same thoughts. I think your suggestion is
really simple (I like simple solutions) and works for most of our
requirements. But I also see some improvements with my suggested solution:
- I would like to have the ability of an exponential back of delay per
message (make the first retry after a few secons, the second after one
minute, ...). One of our services has a really high throughput and we have
to process the messages as early as possible when the back end system is
available again.
- If we use this pattern in multiple services, I think it's a good idea to
generalize this and make it available without the need of the error route
duplication for each route which has this requirement.

@Willem: If I understood you correct, you suggest something like this:
<bean id="activemq1"
class="org.apache.activemq.camel.component.ActiveMQComponent">
  <property name="connectionFactory" ref="connectionFactory1" />
</bean>

<bean id="connectionFactory1"
class="org.apache.activemq.pool.PooledConnectionFactory">
  <constructor-arg
value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=10&amp;jms.redeliveryPolicy.deliveryDelay=60000&amp;jms.redeliveryPolicy.useExponentialBackOff=true&amp;jms.redeliveryPolicy.backOffMultiplier=2"/>
  <!-- some other properties -->
</bean>

<bean id="activemq2"
class="org.apache.activemq.camel.component.ActiveMQComponent">
  <property name="connectionFactory" ref="connectionFactory2" />
</bean>

<bean id="connectionFactory2"
class="org.apache.activemq.pool.PooledConnectionFactory">
  <constructor-arg
value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=5&amp;jms.redeliveryPolicy.deliveryDelay=1000&amp;jms.redeliveryPolicy.useExponentialBackOff=true&amp;jms.redeliveryPolicy.backOffMultiplier=5"/>
  <!-- some other properties -->
</bean>

from(activemq1:queue:foo) // configured this ActiveMQConnectionFactory to
  .transacted("REQUIRED")
  .to("cxf:bean:orderService")
  .to("activemq1:queue:bar");

from(activemq2:queue:bar) // configured this ActiveMQConnectionFactory to
  .transacted("REQUIRED")
  .to("cxf:bean:orderService")
  .to("activemq2:queue:baz");

What I dislike on this solution is, that we have to create a separate
ActiveMQConnectionFactory for each service which use a different redelivery
policy. At present, we use an OSGI lookup in SMX to get the
ActiveMQConnectionFactory which is exported by SMX (from
activemq-broker.xml). I think this is more resource friendly...
But I will take this into account as a good alternative until we have a
solution like the one I suggested... :-)

@Charles: I hope we will find the time to discuss this in 3.5 weeks, if you
are here in Frankfurt with us - or two weeks before if you plan to attend
the FUSE Community day in Frankfurt...
If I understood you correct, you suggest the following:
public void configure() throws Exception {
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setMaximumRedeliveries(10);
    redeliveryPolicy.setRedeliveryDelay(60000l);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setBackOffMultiplier(2.0);

    TransactionErrorHandlerBuilder errorHandlerBuilder = new
TransactionErrorHandlerBuilder();
    errorHandlerBuilder.setRedeliveryPolicy(redeliveryPolicy);

    errorHandler(errorHandlerBuilder);

    from("activemq:queue:foo")
        .transacted("REQUIRED")
        .to("cxf:bean:orderService")
        .to("activemq:queue:bar");
}
I'm not sure I understood how the TransactionErrorHandler works exactly. If
we configure the redeliveryDelay (and all the other options), what happens
under the cover? Does the TransactionErrorHandler waits this time until it
propagates the exception to the TransactionManager? If this is the case, I
have the same bad feelings as in the "normal" RedeliveryErrorHandler I
explained in my first post.


So I still think the new "delay and schedule message delivery" [1] from
ActiveMQ 5.4 brings a good feature we should support in Camel. I will work
out a simple example which hopefully convince you. ;-)

[1] http://activemq.apache.org/delay-and-schedule-message-delivery.html

Best,
Christian

Re: Missing feature to handle errors in a route which reads from an activemq destination

Posted by Christian Müller <ch...@gmail.com>.
Thanks for your detailed answer Claus.
But that's what I mean with "may it works as a work around". Because for
each different ActiveMQComponent we create a new ActiveMQConnectionFactory.
Does this sound right for you? Should we do not support a more elegant and
resource friendly solution? Something like:

from("activemq:queue:foo?redeliveryPolicy=#redeliveryPolicy")

Than we can still stick to have only one ActiveMQConnectionFactory and
configure the ActiveMQConnection with the redelivery policy.

Thanks for sharing your minds,
Christian

Re: Missing feature to handle errors in a route which reads from an activemq destination

Posted by Claus Ibsen <cl...@gmail.com>.
On Sun, Jul 31, 2011 at 7:42 PM, Christian Müller
<ch...@gmail.com> wrote:
> Hello Claus!
>
> As you know, the redelivery logic is part of Apache ActiveMQ, but we cannot
> use it in Camel in the way I would like it. To only have one redelivery
> policy in place is not sufficient for us (if you configure the redelivery
> policy in your ActiveMQConnectionFactory). We have multiple applications
> sharing the same broker with different requirements (online, batch, ...).
>
> I know it's also possible to configure it on the ActiveMQConnection [1], but
> as I know we don't have this possibility in Camel right now. Correct me if
> I'm wrong. May be this is a better solution for my needs, when we have to
> possibilities to configure the redelivery policy as an option in the
> ActiveMqComponent:
>
> from("activemq:queue:foo?redeliveryPolicy=#redeliveryPolicy")...
>
> or
>
> from("activemq:queue:foo?initialRedeliveryDelay=500&maximumRedeliveries=5&...")...
>
> But for this, I think I have to open a JIRA for ActiveMQ, right?
>
> [1] http://activemq.apache.org/message-redelivery-and-dlq-handling.html
>

You can setup a number of activemq components, one for each that has
different redelivery settings
As show here:
http://camel.apache.org/activemq

Then just give the <bean> a different id, and have the brokerURL with
the redelivery settings.

<bean id="activemq" ...>
</bean>

<bean id="activemq-batch" ...>
</bean>

<bean id="activemq-online" ...>
</bean>


Then in the Camel routes you pick the id you want to use

<from uri="activemq-online:queue:foo"/>
...


> Best,
> Christian
>
> On Sun, Jul 31, 2011 at 2:20 PM, Claus Ibsen <cl...@gmail.com> wrote:
>
>> Frankly I thing the redelivery logic should be part of Apache ActiveMQ.
>>
>> Your solution is brittle in the fact that you consume the message, and
>> then send it back to the broker. What happens if you cannot send the
>> message back to the broker?
>> It would be better if the broker handled all this out of the box.
>>
>> ActiveMQ already has redelivery policy where you can configure various
>> delays options such as exponential backoff etc.
>>
>> And there is a JIRA ticket for ActiveMQ to support asynchronous
>> scheduled redeliveries.
>> https://issues.apache.org/jira/browse/AMQ-1853
>> Which has the potential for consumers to pickup the next message, and
>> not block waiting to issue the redelivery.
>>
>>
>>
>> On Sun, Jul 31, 2011 at 1:49 PM, Christian Müller
>> <ch...@gmail.com> wrote:
>> > Hello!
>> >
>> > I didn't get so much responses as I hoped. May it was not clear what I
>> try
>> > to do or what I miss at present in Camel.
>> > Therefore a build a small unit test (and attached it) to demonstrate what
>> > I'm looking for (for convenience I also paste the code into this mail at
>> the
>> > end).
>> >
>> > I build my own (simple) redelivery processor which I use to re-enqueue
>> > messages into ActiveMQ after the default Camel error handler was kicked
>> in
>> > and catched the exception. I use the same logic as Camel does (but much
>> more
>> > simpler for this test) to trace the redelivery delay and count. But
>> instead
>> > to wait for the retry, I put the message message into ActiveMQ for
>> > redelivery and set the "AMQ_SCHEDULED_DELAY" header which is used by
>> > ActiveMQ to delay the delivery of the message:
>> >
>> > public class RedeliveryTest extends CamelTestSupport {
>> >
>> >     @EndpointInject(uri = "mock:intercept")
>> >     private MockEndpoint intercept;
>> >
>> >     @EndpointInject(uri = "mock:dlq")
>> >     private MockEndpoint dlq;
>> >
>> >     @Before
>> >     public void setUp() throws Exception {
>> >         disableJMX();
>> >
>> >         super.setUp();
>> >     }
>> >
>> >     @Test
>> >     public void redelieryTest() throws Exception {
>> >         context.addRoutes(new RouteBuilder() {
>> >             @Override
>> >             public void configure() throws Exception {
>> >                 from("activemq:queue:dlq")
>> >                     .to("mock:dlq");
>> >             }
>> >         });
>> >
>> >         intercept.expectedMessageCount(6); // 1 + 5 redeliveries
>> >         dlq.expectedMessageCount(1);
>> >
>> >         template.sendBody("activemq:queue:start", "Hello Camel!");
>> >
>> >         intercept.assertIsSatisfied();
>> >         dlq.assertIsSatisfied();
>> >
>> >         Exchange exchange = dlq.getExchanges().get(0);
>> >         assertEquals(new Long(5),
>> > exchange.getIn().getHeader("CamelActiveMqRedeliveryCounter"));
>> >         assertEquals(new Long(3200l),
>> > exchange.getIn().getHeader("CamelActiveMqRedeliveryDelay"));
>> >     }
>> >
>> >     @Override
>> >     protected JndiRegistry createRegistry() throws Exception {
>> >         JndiRegistry registry = super.createRegistry();
>> >
>> >         ActiveMQComponent activemq =
>> >
>> ActiveMQComponent.activeMQComponent("vm://localhost?broker.schedulerSupport=true&broker.persistent=false&broker.useJmx=false");
>> >         registry.bind("activemq", activemq);
>> >
>> >         return registry;
>> >     }
>> >
>> >     @Override
>> >     protected RouteBuilder createRouteBuilder() throws Exception {
>> >         return new RouteBuilder() {
>> >             @Override
>> >             public void configure() throws Exception {
>> >                 context.addInterceptStrategy(new Tracer());
>> >
>> >                 ActiveMqRedeliveryProcessor activeMqRedeliveryProcessor =
>> > new ActiveMqRedeliveryProcessor();
>> >
>> >
>> activeMqRedeliveryProcessor.setRedeliveryEndpoint("activemq:queue:start");
>> >
>> > activeMqRedeliveryProcessor.setDeadLetterEndpoint("activemq:queue:dlq");
>> >                 activeMqRedeliveryProcessor.setRedeliveryDelay(200);
>> >                 activeMqRedeliveryProcessor.setBackOffMultiplier(2.0);
>> >                 activeMqRedeliveryProcessor.setMaximumRedeliveries(5);
>> >
>> >                 onException(Exception.class)
>> >                     .handled(true)
>> >                     .bean(activeMqRedeliveryProcessor)
>> >                     .end();
>> >
>> >                 from("activemq:queue:start").routeId("main")
>> >                     .to("mock:intercept")
>> >                     .throwException(new Exception("forced Exception for
>> > test!"));
>> >             }
>> >         };
>> >     }
>> > }
>> >
>> > And the ActiveMqRedeliveryProcessor is:
>> >
>> > public class ActiveMqRedeliveryProcessor {
>> >
>> >     private String redeliveryEndpoint;
>> >     private String deadLetterEndpoint;
>> >
>> >     private long redeliveryDelay = 0l;
>> >     private double backOffMultiplier = 1;
>> >     private int maximumRedeliveries = 0;
>> >
>> >     public void process(Exchange exchange) throws Exception {
>> >         JmsMessage message = exchange.getIn(JmsMessage.class);
>> >
>> >         Long delay = message.getHeader("CamelActiveMqRedeliveryDelay",
>> > Long.class);
>> >         Long redeliveryCount =
>> > message.getHeader("CamelActiveMqRedeliveryCounter", Long.class);
>> >
>> >         if (redeliveryCount == null) {
>> >             redeliveryCount = new Long(0);
>> >         }
>> >
>> >         ProducerTemplate template = new
>> > DefaultProducerTemplate(exchange.getContext());
>> >         template.start();
>> >
>> >         if (redeliveryCount < maximumRedeliveries) {
>> >             redeliveryCount = new Long(redeliveryCount + 1);
>> >             message.setHeader("CamelActiveMqRedeliveryCounter",
>> > redeliveryCount);
>> >
>> >             if (delay == null) {
>> >                 delay = new Long(redeliveryDelay);
>> >             } else {
>> >                 delay = new Long((long) (delay * backOffMultiplier));
>> >             }
>> >             message.setHeader("scheduledJobId", null);
>> >             message.setHeader("CamelActiveMqRedeliveryDelay", delay);
>> >             message.setHeader(ScheduledMessage.AMQ_SCHEDULED_DELAY,
>> delay);
>> >
>> >             template.send(redeliveryEndpoint, exchange);
>> >         } else {
>> >             template.send(deadLetterEndpoint, exchange);
>> >         }
>> >         template.stop();
>> >     }
>> >
>> >     public void setRedeliveryDelay(long redeliveryDelay) {
>> >         this.redeliveryDelay = redeliveryDelay;
>> >     }
>> >
>> >     public void setBackOffMultiplier(double backOffMultiplier) {
>> >         this.backOffMultiplier = backOffMultiplier;
>> >     }
>> >
>> >     public void setMaximumRedeliveries(int maximumRedeliveries) {
>> >         this.maximumRedeliveries = maximumRedeliveries;
>> >     }
>> >
>> >     public void setRedeliveryEndpoint(String redeliveryEndpoint) {
>> >         this.redeliveryEndpoint = redeliveryEndpoint;
>> >     }
>> >
>> >     public void setDeadLetterEndpoint(String deadLetterEndpoint) {
>> >         this.deadLetterEndpoint = deadLetterEndpoint;
>> >     }
>> > }
>> >
>> > I'm looking for a better integration into Camel for this feature, if
>> other
>> > people also think this is a common use case (let ActiveMQ handle the
>> > redelivery instead of having long live inflight messages if the delay is
>> > more than a few minutes). This integration could looks like:
>> >
>> > errorHandler(
>> >   activeMqDeadLetterChannel("activemq:queue:FOO", "activemq:queue:DLQ")
>> >   .maximumRedeliveries(8)
>> >   .deliveryDelay(60000)
>> >   .useExponentialBackOff()
>> >   .backOffMultiplier(2));
>> > Where "activemq:queue:FOO" is the queue for redelivery and
>> > "activemq:queue:DLQ" is the dead letter queue.
>> >
>> > I'm really interested in your opinions whether this is also useful for
>> other
>> > people or is this only a special requirement from my site.
>> >
>> > Best,
>> > Christian
>> >
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> FuseSource
>> Email: cibsen@fusesource.com
>> Web: http://fusesource.com
>> Twitter: davsclaus, fusenews
>> Blog: http://davsclaus.blogspot.com/
>> Author of Camel in Action: http://www.manning.com/ibsen/
>>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: Missing feature to handle errors in a route which reads from an activemq destination

Posted by Christian Müller <ch...@gmail.com>.
For clarification:
We have multiple services/bundles running inside one ServiceMix instance and
sharing the same ActiveMQConnectionFactory which is exported as OSGI service
by the activemq-broker.xml configuration.
We could use a separate ActiveMQConnectionFactory for each service/bundle,
but this looks not as the right way for me. May as a work around until we
have a better solution...

Best,
Christian

On Sun, Jul 31, 2011 at 7:42 PM, Christian Müller <
christian.mueller@gmail.com> wrote:

> Hello Claus!
>
> As you know, the redelivery logic is part of Apache ActiveMQ, but we cannot
> use it in Camel in the way I would like it. To only have one redelivery
> policy in place is not sufficient for us (if you configure the redelivery
> policy in your ActiveMQConnectionFactory). We have multiple applications
> sharing the same broker with different requirements (online, batch, ...).
>
> I know it's also possible to configure it on the ActiveMQConnection [1],
> but as I know we don't have this possibility in Camel right now. Correct me
> if I'm wrong. May be this is a better solution for my needs, when we have to
> possibilities to configure the redelivery policy as an option in the
> ActiveMqComponent:
>
> from("activemq:queue:foo?redeliveryPolicy=#redeliveryPolicy")...
>
> or
>
>
> from("activemq:queue:foo?initialRedeliveryDelay=500&maximumRedeliveries=5&...")...
>
> But for this, I think I have to open a JIRA for ActiveMQ, right?
>
> [1] http://activemq.apache.org/message-redelivery-and-dlq-handling.html
>
> Best,
> Christian
>
>
> On Sun, Jul 31, 2011 at 2:20 PM, Claus Ibsen <cl...@gmail.com>wrote:
>
>> Frankly I thing the redelivery logic should be part of Apache ActiveMQ.
>>
>> Your solution is brittle in the fact that you consume the message, and
>> then send it back to the broker. What happens if you cannot send the
>> message back to the broker?
>> It would be better if the broker handled all this out of the box.
>>
>> ActiveMQ already has redelivery policy where you can configure various
>> delays options such as exponential backoff etc.
>>
>> And there is a JIRA ticket for ActiveMQ to support asynchronous
>> scheduled redeliveries.
>> https://issues.apache.org/jira/browse/AMQ-1853
>> Which has the potential for consumers to pickup the next message, and
>> not block waiting to issue the redelivery.
>>
>>
>>
>> On Sun, Jul 31, 2011 at 1:49 PM, Christian Müller
>> <ch...@gmail.com> wrote:
>> > Hello!
>> >
>> > I didn't get so much responses as I hoped. May it was not clear what I
>> try
>> > to do or what I miss at present in Camel.
>> > Therefore a build a small unit test (and attached it) to demonstrate
>> what
>> > I'm looking for (for convenience I also paste the code into this mail at
>> the
>> > end).
>> >
>> > I build my own (simple) redelivery processor which I use to re-enqueue
>> > messages into ActiveMQ after the default Camel error handler was kicked
>> in
>> > and catched the exception. I use the same logic as Camel does (but much
>> more
>> > simpler for this test) to trace the redelivery delay and count. But
>> instead
>> > to wait for the retry, I put the message message into ActiveMQ for
>> > redelivery and set the "AMQ_SCHEDULED_DELAY" header which is used by
>> > ActiveMQ to delay the delivery of the message:
>> >
>> > public class RedeliveryTest extends CamelTestSupport {
>> >
>> >     @EndpointInject(uri = "mock:intercept")
>> >     private MockEndpoint intercept;
>> >
>> >     @EndpointInject(uri = "mock:dlq")
>> >     private MockEndpoint dlq;
>> >
>> >     @Before
>> >     public void setUp() throws Exception {
>> >         disableJMX();
>> >
>> >         super.setUp();
>> >     }
>> >
>> >     @Test
>> >     public void redelieryTest() throws Exception {
>> >         context.addRoutes(new RouteBuilder() {
>> >             @Override
>> >             public void configure() throws Exception {
>> >                 from("activemq:queue:dlq")
>> >                     .to("mock:dlq");
>> >             }
>> >         });
>> >
>> >         intercept.expectedMessageCount(6); // 1 + 5 redeliveries
>> >         dlq.expectedMessageCount(1);
>> >
>> >         template.sendBody("activemq:queue:start", "Hello Camel!");
>> >
>> >         intercept.assertIsSatisfied();
>> >         dlq.assertIsSatisfied();
>> >
>> >         Exchange exchange = dlq.getExchanges().get(0);
>> >         assertEquals(new Long(5),
>> > exchange.getIn().getHeader("CamelActiveMqRedeliveryCounter"));
>> >         assertEquals(new Long(3200l),
>> > exchange.getIn().getHeader("CamelActiveMqRedeliveryDelay"));
>> >     }
>> >
>> >     @Override
>> >     protected JndiRegistry createRegistry() throws Exception {
>> >         JndiRegistry registry = super.createRegistry();
>> >
>> >         ActiveMQComponent activemq =
>> >
>> ActiveMQComponent.activeMQComponent("vm://localhost?broker.schedulerSupport=true&broker.persistent=false&broker.useJmx=false");
>> >         registry.bind("activemq", activemq);
>> >
>> >         return registry;
>> >     }
>> >
>> >     @Override
>> >     protected RouteBuilder createRouteBuilder() throws Exception {
>> >         return new RouteBuilder() {
>> >             @Override
>> >             public void configure() throws Exception {
>> >                 context.addInterceptStrategy(new Tracer());
>> >
>> >                 ActiveMqRedeliveryProcessor activeMqRedeliveryProcessor
>> =
>> > new ActiveMqRedeliveryProcessor();
>> >
>> >
>> activeMqRedeliveryProcessor.setRedeliveryEndpoint("activemq:queue:start");
>> >
>> > activeMqRedeliveryProcessor.setDeadLetterEndpoint("activemq:queue:dlq");
>> >                 activeMqRedeliveryProcessor.setRedeliveryDelay(200);
>> >                 activeMqRedeliveryProcessor.setBackOffMultiplier(2.0);
>> >                 activeMqRedeliveryProcessor.setMaximumRedeliveries(5);
>> >
>> >                 onException(Exception.class)
>> >                     .handled(true)
>> >                     .bean(activeMqRedeliveryProcessor)
>> >                     .end();
>> >
>> >                 from("activemq:queue:start").routeId("main")
>> >                     .to("mock:intercept")
>> >                     .throwException(new Exception("forced Exception for
>> > test!"));
>> >             }
>> >         };
>> >     }
>> > }
>> >
>> > And the ActiveMqRedeliveryProcessor is:
>> >
>> > public class ActiveMqRedeliveryProcessor {
>> >
>> >     private String redeliveryEndpoint;
>> >     private String deadLetterEndpoint;
>> >
>> >     private long redeliveryDelay = 0l;
>> >     private double backOffMultiplier = 1;
>> >     private int maximumRedeliveries = 0;
>> >
>> >     public void process(Exchange exchange) throws Exception {
>> >         JmsMessage message = exchange.getIn(JmsMessage.class);
>> >
>> >         Long delay = message.getHeader("CamelActiveMqRedeliveryDelay",
>> > Long.class);
>> >         Long redeliveryCount =
>> > message.getHeader("CamelActiveMqRedeliveryCounter", Long.class);
>> >
>> >         if (redeliveryCount == null) {
>> >             redeliveryCount = new Long(0);
>> >         }
>> >
>> >         ProducerTemplate template = new
>> > DefaultProducerTemplate(exchange.getContext());
>> >         template.start();
>> >
>> >         if (redeliveryCount < maximumRedeliveries) {
>> >             redeliveryCount = new Long(redeliveryCount + 1);
>> >             message.setHeader("CamelActiveMqRedeliveryCounter",
>> > redeliveryCount);
>> >
>> >             if (delay == null) {
>> >                 delay = new Long(redeliveryDelay);
>> >             } else {
>> >                 delay = new Long((long) (delay * backOffMultiplier));
>> >             }
>> >             message.setHeader("scheduledJobId", null);
>> >             message.setHeader("CamelActiveMqRedeliveryDelay", delay);
>> >             message.setHeader(ScheduledMessage.AMQ_SCHEDULED_DELAY,
>> delay);
>> >
>> >             template.send(redeliveryEndpoint, exchange);
>> >         } else {
>> >             template.send(deadLetterEndpoint, exchange);
>> >         }
>> >         template.stop();
>> >     }
>> >
>> >     public void setRedeliveryDelay(long redeliveryDelay) {
>> >         this.redeliveryDelay = redeliveryDelay;
>> >     }
>> >
>> >     public void setBackOffMultiplier(double backOffMultiplier) {
>> >         this.backOffMultiplier = backOffMultiplier;
>> >     }
>> >
>> >     public void setMaximumRedeliveries(int maximumRedeliveries) {
>> >         this.maximumRedeliveries = maximumRedeliveries;
>> >     }
>> >
>> >     public void setRedeliveryEndpoint(String redeliveryEndpoint) {
>> >         this.redeliveryEndpoint = redeliveryEndpoint;
>> >     }
>> >
>> >     public void setDeadLetterEndpoint(String deadLetterEndpoint) {
>> >         this.deadLetterEndpoint = deadLetterEndpoint;
>> >     }
>> > }
>> >
>> > I'm looking for a better integration into Camel for this feature, if
>> other
>> > people also think this is a common use case (let ActiveMQ handle the
>> > redelivery instead of having long live inflight messages if the delay is
>> > more than a few minutes). This integration could looks like:
>> >
>> > errorHandler(
>> >   activeMqDeadLetterChannel("activemq:queue:FOO", "activemq:queue:DLQ")
>> >   .maximumRedeliveries(8)
>> >   .deliveryDelay(60000)
>> >   .useExponentialBackOff()
>> >   .backOffMultiplier(2));
>> > Where "activemq:queue:FOO" is the queue for redelivery and
>> > "activemq:queue:DLQ" is the dead letter queue.
>> >
>> > I'm really interested in your opinions whether this is also useful for
>> other
>> > people or is this only a special requirement from my site.
>> >
>> > Best,
>> > Christian
>> >
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> FuseSource
>> Email: cibsen@fusesource.com
>> Web: http://fusesource.com
>> Twitter: davsclaus, fusenews
>> Blog: http://davsclaus.blogspot.com/
>> Author of Camel in Action: http://www.manning.com/ibsen/
>>
>
>

Re: Missing feature to handle errors in a route which reads from an activemq destination

Posted by Christian Müller <ch...@gmail.com>.
Hello Claus!

As you know, the redelivery logic is part of Apache ActiveMQ, but we cannot
use it in Camel in the way I would like it. To only have one redelivery
policy in place is not sufficient for us (if you configure the redelivery
policy in your ActiveMQConnectionFactory). We have multiple applications
sharing the same broker with different requirements (online, batch, ...).

I know it's also possible to configure it on the ActiveMQConnection [1], but
as I know we don't have this possibility in Camel right now. Correct me if
I'm wrong. May be this is a better solution for my needs, when we have to
possibilities to configure the redelivery policy as an option in the
ActiveMqComponent:

from("activemq:queue:foo?redeliveryPolicy=#redeliveryPolicy")...

or

from("activemq:queue:foo?initialRedeliveryDelay=500&maximumRedeliveries=5&...")...

But for this, I think I have to open a JIRA for ActiveMQ, right?

[1] http://activemq.apache.org/message-redelivery-and-dlq-handling.html

Best,
Christian

On Sun, Jul 31, 2011 at 2:20 PM, Claus Ibsen <cl...@gmail.com> wrote:

> Frankly I thing the redelivery logic should be part of Apache ActiveMQ.
>
> Your solution is brittle in the fact that you consume the message, and
> then send it back to the broker. What happens if you cannot send the
> message back to the broker?
> It would be better if the broker handled all this out of the box.
>
> ActiveMQ already has redelivery policy where you can configure various
> delays options such as exponential backoff etc.
>
> And there is a JIRA ticket for ActiveMQ to support asynchronous
> scheduled redeliveries.
> https://issues.apache.org/jira/browse/AMQ-1853
> Which has the potential for consumers to pickup the next message, and
> not block waiting to issue the redelivery.
>
>
>
> On Sun, Jul 31, 2011 at 1:49 PM, Christian Müller
> <ch...@gmail.com> wrote:
> > Hello!
> >
> > I didn't get so much responses as I hoped. May it was not clear what I
> try
> > to do or what I miss at present in Camel.
> > Therefore a build a small unit test (and attached it) to demonstrate what
> > I'm looking for (for convenience I also paste the code into this mail at
> the
> > end).
> >
> > I build my own (simple) redelivery processor which I use to re-enqueue
> > messages into ActiveMQ after the default Camel error handler was kicked
> in
> > and catched the exception. I use the same logic as Camel does (but much
> more
> > simpler for this test) to trace the redelivery delay and count. But
> instead
> > to wait for the retry, I put the message message into ActiveMQ for
> > redelivery and set the "AMQ_SCHEDULED_DELAY" header which is used by
> > ActiveMQ to delay the delivery of the message:
> >
> > public class RedeliveryTest extends CamelTestSupport {
> >
> >     @EndpointInject(uri = "mock:intercept")
> >     private MockEndpoint intercept;
> >
> >     @EndpointInject(uri = "mock:dlq")
> >     private MockEndpoint dlq;
> >
> >     @Before
> >     public void setUp() throws Exception {
> >         disableJMX();
> >
> >         super.setUp();
> >     }
> >
> >     @Test
> >     public void redelieryTest() throws Exception {
> >         context.addRoutes(new RouteBuilder() {
> >             @Override
> >             public void configure() throws Exception {
> >                 from("activemq:queue:dlq")
> >                     .to("mock:dlq");
> >             }
> >         });
> >
> >         intercept.expectedMessageCount(6); // 1 + 5 redeliveries
> >         dlq.expectedMessageCount(1);
> >
> >         template.sendBody("activemq:queue:start", "Hello Camel!");
> >
> >         intercept.assertIsSatisfied();
> >         dlq.assertIsSatisfied();
> >
> >         Exchange exchange = dlq.getExchanges().get(0);
> >         assertEquals(new Long(5),
> > exchange.getIn().getHeader("CamelActiveMqRedeliveryCounter"));
> >         assertEquals(new Long(3200l),
> > exchange.getIn().getHeader("CamelActiveMqRedeliveryDelay"));
> >     }
> >
> >     @Override
> >     protected JndiRegistry createRegistry() throws Exception {
> >         JndiRegistry registry = super.createRegistry();
> >
> >         ActiveMQComponent activemq =
> >
> ActiveMQComponent.activeMQComponent("vm://localhost?broker.schedulerSupport=true&broker.persistent=false&broker.useJmx=false");
> >         registry.bind("activemq", activemq);
> >
> >         return registry;
> >     }
> >
> >     @Override
> >     protected RouteBuilder createRouteBuilder() throws Exception {
> >         return new RouteBuilder() {
> >             @Override
> >             public void configure() throws Exception {
> >                 context.addInterceptStrategy(new Tracer());
> >
> >                 ActiveMqRedeliveryProcessor activeMqRedeliveryProcessor =
> > new ActiveMqRedeliveryProcessor();
> >
> >
> activeMqRedeliveryProcessor.setRedeliveryEndpoint("activemq:queue:start");
> >
> > activeMqRedeliveryProcessor.setDeadLetterEndpoint("activemq:queue:dlq");
> >                 activeMqRedeliveryProcessor.setRedeliveryDelay(200);
> >                 activeMqRedeliveryProcessor.setBackOffMultiplier(2.0);
> >                 activeMqRedeliveryProcessor.setMaximumRedeliveries(5);
> >
> >                 onException(Exception.class)
> >                     .handled(true)
> >                     .bean(activeMqRedeliveryProcessor)
> >                     .end();
> >
> >                 from("activemq:queue:start").routeId("main")
> >                     .to("mock:intercept")
> >                     .throwException(new Exception("forced Exception for
> > test!"));
> >             }
> >         };
> >     }
> > }
> >
> > And the ActiveMqRedeliveryProcessor is:
> >
> > public class ActiveMqRedeliveryProcessor {
> >
> >     private String redeliveryEndpoint;
> >     private String deadLetterEndpoint;
> >
> >     private long redeliveryDelay = 0l;
> >     private double backOffMultiplier = 1;
> >     private int maximumRedeliveries = 0;
> >
> >     public void process(Exchange exchange) throws Exception {
> >         JmsMessage message = exchange.getIn(JmsMessage.class);
> >
> >         Long delay = message.getHeader("CamelActiveMqRedeliveryDelay",
> > Long.class);
> >         Long redeliveryCount =
> > message.getHeader("CamelActiveMqRedeliveryCounter", Long.class);
> >
> >         if (redeliveryCount == null) {
> >             redeliveryCount = new Long(0);
> >         }
> >
> >         ProducerTemplate template = new
> > DefaultProducerTemplate(exchange.getContext());
> >         template.start();
> >
> >         if (redeliveryCount < maximumRedeliveries) {
> >             redeliveryCount = new Long(redeliveryCount + 1);
> >             message.setHeader("CamelActiveMqRedeliveryCounter",
> > redeliveryCount);
> >
> >             if (delay == null) {
> >                 delay = new Long(redeliveryDelay);
> >             } else {
> >                 delay = new Long((long) (delay * backOffMultiplier));
> >             }
> >             message.setHeader("scheduledJobId", null);
> >             message.setHeader("CamelActiveMqRedeliveryDelay", delay);
> >             message.setHeader(ScheduledMessage.AMQ_SCHEDULED_DELAY,
> delay);
> >
> >             template.send(redeliveryEndpoint, exchange);
> >         } else {
> >             template.send(deadLetterEndpoint, exchange);
> >         }
> >         template.stop();
> >     }
> >
> >     public void setRedeliveryDelay(long redeliveryDelay) {
> >         this.redeliveryDelay = redeliveryDelay;
> >     }
> >
> >     public void setBackOffMultiplier(double backOffMultiplier) {
> >         this.backOffMultiplier = backOffMultiplier;
> >     }
> >
> >     public void setMaximumRedeliveries(int maximumRedeliveries) {
> >         this.maximumRedeliveries = maximumRedeliveries;
> >     }
> >
> >     public void setRedeliveryEndpoint(String redeliveryEndpoint) {
> >         this.redeliveryEndpoint = redeliveryEndpoint;
> >     }
> >
> >     public void setDeadLetterEndpoint(String deadLetterEndpoint) {
> >         this.deadLetterEndpoint = deadLetterEndpoint;
> >     }
> > }
> >
> > I'm looking for a better integration into Camel for this feature, if
> other
> > people also think this is a common use case (let ActiveMQ handle the
> > redelivery instead of having long live inflight messages if the delay is
> > more than a few minutes). This integration could looks like:
> >
> > errorHandler(
> >   activeMqDeadLetterChannel("activemq:queue:FOO", "activemq:queue:DLQ")
> >   .maximumRedeliveries(8)
> >   .deliveryDelay(60000)
> >   .useExponentialBackOff()
> >   .backOffMultiplier(2));
> > Where "activemq:queue:FOO" is the queue for redelivery and
> > "activemq:queue:DLQ" is the dead letter queue.
> >
> > I'm really interested in your opinions whether this is also useful for
> other
> > people or is this only a special requirement from my site.
> >
> > Best,
> > Christian
> >
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cibsen@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/
>

Re: Missing feature to handle errors in a route which reads from an activemq destination

Posted by Claus Ibsen <cl...@gmail.com>.
Frankly I thing the redelivery logic should be part of Apache ActiveMQ.

Your solution is brittle in the fact that you consume the message, and
then send it back to the broker. What happens if you cannot send the
message back to the broker?
It would be better if the broker handled all this out of the box.

ActiveMQ already has redelivery policy where you can configure various
delays options such as exponential backoff etc.

And there is a JIRA ticket for ActiveMQ to support asynchronous
scheduled redeliveries.
https://issues.apache.org/jira/browse/AMQ-1853
Which has the potential for consumers to pickup the next message, and
not block waiting to issue the redelivery.



On Sun, Jul 31, 2011 at 1:49 PM, Christian Müller
<ch...@gmail.com> wrote:
> Hello!
>
> I didn't get so much responses as I hoped. May it was not clear what I try
> to do or what I miss at present in Camel.
> Therefore a build a small unit test (and attached it) to demonstrate what
> I'm looking for (for convenience I also paste the code into this mail at the
> end).
>
> I build my own (simple) redelivery processor which I use to re-enqueue
> messages into ActiveMQ after the default Camel error handler was kicked in
> and catched the exception. I use the same logic as Camel does (but much more
> simpler for this test) to trace the redelivery delay and count. But instead
> to wait for the retry, I put the message message into ActiveMQ for
> redelivery and set the "AMQ_SCHEDULED_DELAY" header which is used by
> ActiveMQ to delay the delivery of the message:
>
> public class RedeliveryTest extends CamelTestSupport {
>
>     @EndpointInject(uri = "mock:intercept")
>     private MockEndpoint intercept;
>
>     @EndpointInject(uri = "mock:dlq")
>     private MockEndpoint dlq;
>
>     @Before
>     public void setUp() throws Exception {
>         disableJMX();
>
>         super.setUp();
>     }
>
>     @Test
>     public void redelieryTest() throws Exception {
>         context.addRoutes(new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                 from("activemq:queue:dlq")
>                     .to("mock:dlq");
>             }
>         });
>
>         intercept.expectedMessageCount(6); // 1 + 5 redeliveries
>         dlq.expectedMessageCount(1);
>
>         template.sendBody("activemq:queue:start", "Hello Camel!");
>
>         intercept.assertIsSatisfied();
>         dlq.assertIsSatisfied();
>
>         Exchange exchange = dlq.getExchanges().get(0);
>         assertEquals(new Long(5),
> exchange.getIn().getHeader("CamelActiveMqRedeliveryCounter"));
>         assertEquals(new Long(3200l),
> exchange.getIn().getHeader("CamelActiveMqRedeliveryDelay"));
>     }
>
>     @Override
>     protected JndiRegistry createRegistry() throws Exception {
>         JndiRegistry registry = super.createRegistry();
>
>         ActiveMQComponent activemq =
> ActiveMQComponent.activeMQComponent("vm://localhost?broker.schedulerSupport=true&broker.persistent=false&broker.useJmx=false");
>         registry.bind("activemq", activemq);
>
>         return registry;
>     }
>
>     @Override
>     protected RouteBuilder createRouteBuilder() throws Exception {
>         return new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                 context.addInterceptStrategy(new Tracer());
>
>                 ActiveMqRedeliveryProcessor activeMqRedeliveryProcessor =
> new ActiveMqRedeliveryProcessor();
>
> activeMqRedeliveryProcessor.setRedeliveryEndpoint("activemq:queue:start");
>
> activeMqRedeliveryProcessor.setDeadLetterEndpoint("activemq:queue:dlq");
>                 activeMqRedeliveryProcessor.setRedeliveryDelay(200);
>                 activeMqRedeliveryProcessor.setBackOffMultiplier(2.0);
>                 activeMqRedeliveryProcessor.setMaximumRedeliveries(5);
>
>                 onException(Exception.class)
>                     .handled(true)
>                     .bean(activeMqRedeliveryProcessor)
>                     .end();
>
>                 from("activemq:queue:start").routeId("main")
>                     .to("mock:intercept")
>                     .throwException(new Exception("forced Exception for
> test!"));
>             }
>         };
>     }
> }
>
> And the ActiveMqRedeliveryProcessor is:
>
> public class ActiveMqRedeliveryProcessor {
>
>     private String redeliveryEndpoint;
>     private String deadLetterEndpoint;
>
>     private long redeliveryDelay = 0l;
>     private double backOffMultiplier = 1;
>     private int maximumRedeliveries = 0;
>
>     public void process(Exchange exchange) throws Exception {
>         JmsMessage message = exchange.getIn(JmsMessage.class);
>
>         Long delay = message.getHeader("CamelActiveMqRedeliveryDelay",
> Long.class);
>         Long redeliveryCount =
> message.getHeader("CamelActiveMqRedeliveryCounter", Long.class);
>
>         if (redeliveryCount == null) {
>             redeliveryCount = new Long(0);
>         }
>
>         ProducerTemplate template = new
> DefaultProducerTemplate(exchange.getContext());
>         template.start();
>
>         if (redeliveryCount < maximumRedeliveries) {
>             redeliveryCount = new Long(redeliveryCount + 1);
>             message.setHeader("CamelActiveMqRedeliveryCounter",
> redeliveryCount);
>
>             if (delay == null) {
>                 delay = new Long(redeliveryDelay);
>             } else {
>                 delay = new Long((long) (delay * backOffMultiplier));
>             }
>             message.setHeader("scheduledJobId", null);
>             message.setHeader("CamelActiveMqRedeliveryDelay", delay);
>             message.setHeader(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
>
>             template.send(redeliveryEndpoint, exchange);
>         } else {
>             template.send(deadLetterEndpoint, exchange);
>         }
>         template.stop();
>     }
>
>     public void setRedeliveryDelay(long redeliveryDelay) {
>         this.redeliveryDelay = redeliveryDelay;
>     }
>
>     public void setBackOffMultiplier(double backOffMultiplier) {
>         this.backOffMultiplier = backOffMultiplier;
>     }
>
>     public void setMaximumRedeliveries(int maximumRedeliveries) {
>         this.maximumRedeliveries = maximumRedeliveries;
>     }
>
>     public void setRedeliveryEndpoint(String redeliveryEndpoint) {
>         this.redeliveryEndpoint = redeliveryEndpoint;
>     }
>
>     public void setDeadLetterEndpoint(String deadLetterEndpoint) {
>         this.deadLetterEndpoint = deadLetterEndpoint;
>     }
> }
>
> I'm looking for a better integration into Camel for this feature, if other
> people also think this is a common use case (let ActiveMQ handle the
> redelivery instead of having long live inflight messages if the delay is
> more than a few minutes). This integration could looks like:
>
> errorHandler(
>   activeMqDeadLetterChannel("activemq:queue:FOO", "activemq:queue:DLQ")
>   .maximumRedeliveries(8)
>   .deliveryDelay(60000)
>   .useExponentialBackOff()
>   .backOffMultiplier(2));
> Where "activemq:queue:FOO" is the queue for redelivery and
> "activemq:queue:DLQ" is the dead letter queue.
>
> I'm really interested in your opinions whether this is also useful for other
> people or is this only a special requirement from my site.
>
> Best,
> Christian
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: Missing feature to handle errors in a route which reads from an activemq destination

Posted by Christian Müller <ch...@gmail.com>.
Hello!

I didn't get so much responses as I hoped. May it was not clear what I try
to do or what I miss at present in Camel.
Therefore a build a small unit test (and attached it) to demonstrate what
I'm looking for (for convenience I also paste the code into this mail at the
end).

I build my own (simple) redelivery processor which I use to re-enqueue
messages into ActiveMQ after the default Camel error handler was kicked in
and catched the exception. I use the same logic as Camel does (but much more
simpler for this test) to trace the redelivery delay and count. But instead
to wait for the retry, I put the message message into ActiveMQ for
redelivery and set the "AMQ_SCHEDULED_DELAY" header which is used by
ActiveMQ to delay the delivery of the message:

public class RedeliveryTest extends CamelTestSupport {

    @EndpointInject(uri = "mock:intercept")
    private MockEndpoint intercept;

    @EndpointInject(uri = "mock:dlq")
    private MockEndpoint dlq;

    @Before
    public void setUp() throws Exception {
        disableJMX();

        super.setUp();
    }

    @Test
    public void redelieryTest() throws Exception {
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("activemq:queue:dlq")
                    .to("mock:dlq");
            }
        });

        intercept.expectedMessageCount(6); // 1 + 5 redeliveries
        dlq.expectedMessageCount(1);

        template.sendBody("activemq:queue:start", "Hello Camel!");

        intercept.assertIsSatisfied();
        dlq.assertIsSatisfied();

        Exchange exchange = dlq.getExchanges().get(0);
        assertEquals(new Long(5),
exchange.getIn().getHeader("CamelActiveMqRedeliveryCounter"));
        assertEquals(new Long(3200l),
exchange.getIn().getHeader("CamelActiveMqRedeliveryDelay"));
    }

    @Override
    protected JndiRegistry createRegistry() throws Exception {
        JndiRegistry registry = super.createRegistry();

        ActiveMQComponent activemq =
ActiveMQComponent.activeMQComponent("vm://localhost?broker.schedulerSupport=true&broker.persistent=false&broker.useJmx=false");
        registry.bind("activemq", activemq);

        return registry;
    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                context.addInterceptStrategy(new Tracer());

                ActiveMqRedeliveryProcessor activeMqRedeliveryProcessor =
new ActiveMqRedeliveryProcessor();

activeMqRedeliveryProcessor.setRedeliveryEndpoint("activemq:queue:start");

activeMqRedeliveryProcessor.setDeadLetterEndpoint("activemq:queue:dlq");
                activeMqRedeliveryProcessor.setRedeliveryDelay(200);
                activeMqRedeliveryProcessor.setBackOffMultiplier(2.0);
                activeMqRedeliveryProcessor.setMaximumRedeliveries(5);

                onException(Exception.class)
                    .handled(true)
                    .bean(activeMqRedeliveryProcessor)
                    .end();

                from("activemq:queue:start").routeId("main")
                    .to("mock:intercept")
                    .throwException(new Exception("forced Exception for
test!"));
            }
        };
    }
}

And the ActiveMqRedeliveryProcessor is:

public class ActiveMqRedeliveryProcessor {

    private String redeliveryEndpoint;
    private String deadLetterEndpoint;

    private long redeliveryDelay = 0l;
    private double backOffMultiplier = 1;
    private int maximumRedeliveries = 0;

    public void process(Exchange exchange) throws Exception {
        JmsMessage message = exchange.getIn(JmsMessage.class);

        Long delay = message.getHeader("CamelActiveMqRedeliveryDelay",
Long.class);
        Long redeliveryCount =
message.getHeader("CamelActiveMqRedeliveryCounter", Long.class);

        if (redeliveryCount == null) {
            redeliveryCount = new Long(0);
        }

        ProducerTemplate template = new
DefaultProducerTemplate(exchange.getContext());
        template.start();

        if (redeliveryCount < maximumRedeliveries) {
            redeliveryCount = new Long(redeliveryCount + 1);
            message.setHeader("CamelActiveMqRedeliveryCounter",
redeliveryCount);

            if (delay == null) {
                delay = new Long(redeliveryDelay);
            } else {
                delay = new Long((long) (delay * backOffMultiplier));
            }
            message.setHeader("scheduledJobId", null);
            message.setHeader("CamelActiveMqRedeliveryDelay", delay);
            message.setHeader(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);

            template.send(redeliveryEndpoint, exchange);
        } else {
            template.send(deadLetterEndpoint, exchange);
        }
        template.stop();
    }

    public void setRedeliveryDelay(long redeliveryDelay) {
        this.redeliveryDelay = redeliveryDelay;
    }

    public void setBackOffMultiplier(double backOffMultiplier) {
        this.backOffMultiplier = backOffMultiplier;
    }

    public void setMaximumRedeliveries(int maximumRedeliveries) {
        this.maximumRedeliveries = maximumRedeliveries;
    }

    public void setRedeliveryEndpoint(String redeliveryEndpoint) {
        this.redeliveryEndpoint = redeliveryEndpoint;
    }

    public void setDeadLetterEndpoint(String deadLetterEndpoint) {
        this.deadLetterEndpoint = deadLetterEndpoint;
    }
}

I'm looking for a better integration into Camel for this feature, if other
people also think this is a common use case (let ActiveMQ handle the
redelivery instead of having long live inflight messages if the delay is
more than a few minutes). This integration could looks like:

errorHandler(
  activeMqDeadLetterChannel("activemq:queue:FOO", "activemq:queue:DLQ")
  .maximumRedeliveries(8)
  .deliveryDelay(60000)
  .useExponentialBackOff()
  .backOffMultiplier(2));

Where "activemq:queue:FOO" is the queue for redelivery and
"activemq:queue:DLQ" is the dead letter queue.

I'm really interested in your opinions whether this is also useful for other
people or is this only a special requirement from my site.

Best,
Christian