You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Dragisa Krsmanovic <dk...@plos.org> on 2010/04/02 23:15:58 UTC

Message blocks route until all redelivery attempts are exhausted

I have a route that is configured to re-deliver messages on exception.
If there are messages in that queue and one of them fails, all other
messages are blocked until all re-delivery attempts are exhausted.

I would like other messages to go through while the ones that failed are
waiting for their re-delivery. Message order is not important. What is a
good way to do this ?


The route I tested with looks like this:

from("seda:start")
 .onException(Exception.class)
   .handled(true)
   .redeliverDelay(100)
   .maximumRedeliveries(1)
   .to("mock:error")
 .end()
 .process(new TestProcessor())
 .to("mock:end");


--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
This email is confidential to the intended recipient. If you have received it in error, please notify the sender and delete it from your
system. Any unauthorized use, disclosure or copying is not permitted. The views or opinions presented are solely those of the sender and do
not necessarily represent those of Public Library of Science unless otherwise specifically stated. Please note that neither Public Library
of Science nor any of its agents accept any responsibility for any viruses that may be contained in this e-mail or its attachments and it
is your responsibility to scan the e-mail and attachments (if any).

Re: Message blocks route until all redelivery attempts are exhausted

Posted by javfindly <ja...@findly.com>.
Whoever is facing this problem and cannot use async schedulling because the
endpoint is transacted, I've came up with a workaround, in my case I'm using
RabbitMQ as the from Endpoint:

This is my route configuration:

        from(fromEndpoint) // AMQP INPUT QUEUE
            .routeId(consumerRouteId)
            .to(DIRECT_SEND_DESTINATION);

        from(retryEndpoint) //AMQP RETRY QUEUE
            .errorHandler(deadLetterChannel) // a Dead Letter Channel with
Redelivery configuration
            .routeId(retryRouteid)
            .setHeader(MESSAGE_RETRY_HEADER, constant(true))
            .to(DIRECT_SEND_DESTINATION);

        from(DIRECT_SEND_DESTINATION) // Direct endpoints that work as a
funnel between the Input and Retry queue
            .doTry()
                ...
                .process(sendToDestinationProcessor) //If this throws an
exception we will decide what to do in the catch
                ...
            .doCatch(Exception.class)
                .process(redeliveryExceptionRethrowProcessor)
                .to(retryEndpoint) // Is the first time we hear about this
message, so we send (the original) to the retry queue
            .end();

And the redeliveryExceptionRethrowProcessor will decide if it needs to
rethrow the exception or not based on a header that is present just in the
Retry Messages. Note that the retryEndpoint route has the errorHandler
configuration, so it will handle redelivery from that queue only, not from
the Input queue.
This way, redelivery attempts are not blocking anymore, and we don't use an
async scheduler where messages can be lost in case of an outage.

public class RedeliveryExceptionRethrowProcessor implements Processor {

    private final String header;

    public RedeliveryExceptionRethrowProcessor(String header) {
        this.header = header;
    }

    @Override
    public void process(Exchange exchange) throws Exception {
        Boolean rethrowException = exchange.getIn().getHeader(header,
Boolean.class);
        if (rethrowException != null && rethrowException) {
            throw exchange.getProperty(Exchange.EXCEPTION_CAUGHT,
Exception.class);
        }
        exchange.setIn(exchange.getUnitOfWork().getOriginalInMessage());
    }

}

//If we are not rethrowing the exception, is the first time we see this
message, so it's going to be sent to the retry queue.
//For it to work, we need to use the original message, so any changes in the
exchange while trying to process it would be ignored



--
View this message in context: http://camel.465427.n5.nabble.com/Message-blocks-route-until-all-redelivery-attempts-are-exhausted-tp472319p5784827.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Message blocks route until all redelivery attempts are exhausted

Posted by Claus Ibsen <cl...@gmail.com>.
On Fri, Feb 4, 2011 at 3:36 PM, waterback <ma...@innoq.com> wrote:
>
> Hi Claus,
>
> i know this is a rather old post, but i have a question referring to your
> response:
>
> Does this mean, that if you have - lets say - 10 transacted routes, which
> all use the same RedeliveryHandler
> that if even if you don't have an exception that triggers a redelivery,
> those 10 routes won't get worked on in parallel?
>
> Or does this only come into effect when there's an exception?
>

The redelivery is scheduled in Camel 2.4 onwards, and you can use the
asyncDelayed option to allow asynchronous redelivery tasks.  See here:
http://camel.apache.org/delayer.html. The same options is avail on
error handler.

Then if there is a redelivery then its just add to the thread pool and
the thread can move on processing the next message.

Then its up to the redelivery thread pool how many redeliveries you
can process concurrently. Currently its configured to allow 10 threads
in that pool. We should offer an option so you can configure this as
well. I will create a JIRA for that.


> Thx,
> Martin
> --
> View this message in context: http://camel.465427.n5.nabble.com/Message-blocks-route-until-all-redelivery-attempts-are-exhausted-tp472319p3371056.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: Message blocks route until all redelivery attempts are exhausted

Posted by waterback <ma...@innoq.com>.
Hi Claus,

i know this is a rather old post, but i have a question referring to your
response:

Does this mean, that if you have - lets say - 10 transacted routes, which
all use the same RedeliveryHandler
that if even if you don't have an exception that triggers a redelivery,
those 10 routes won't get worked on in parallel?

Or does this only come into effect when there's an exception?

Thx,
Martin
-- 
View this message in context: http://camel.465427.n5.nabble.com/Message-blocks-route-until-all-redelivery-attempts-are-exhausted-tp472319p3371056.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Message blocks route until all redelivery attempts are exhausted

Posted by slew77 <sl...@googlemail.com>.
I have exactly the same problem.  I have many thousands of endpoints any
number of which could be down.  I need to be able to handle redelivery
without blocking good messages.

Wondering if you came up with a workaround for this?

Thanks,
Steve.


Claus Ibsen-2 wrote:
> 
> Hi
> 
> It depends on the situation but sometimes you could simply let it fail
> and rollback to the source.
> 
> But what you are looking for is currently not implemented in Camel.
> Having its RedeliveryErrorHandler
> support non blocking delays.
> 
> This is something we will add in the future. I was hoping I got a
> chance to work a bit on the internals
> to make the Channel concept more flexible and to support having a
> Queue which would allow you to send
> the Exchange to that said queue and have the Channel poll the
> Exchanges from the Queue.
> Then all is needed is to use a DelayedQueue which then have a delay
> value for the redelivery exchanges.
> 
> The same concept could be implemented directly in
> RedeliveryErrorHandler to get this implemented quicker.
> 
> The caveat when doing this is that its a new thread which will
> re-process the Exchange and thus you cannot do this
> if you have transactions enabled.
> 
> I will create a ticket in JIRA for this new feature.
> 
> 
> On Mon, Apr 5, 2010 at 7:22 PM, Dragisa Krsmanovic <dk...@plos.org>
> wrote:
>> Yes, but this only increases number of messages that are needed to
>> "block" the route. What if I have thousands of messages and somewhere
>> between 1 and 1000 can fail ? I don't want "good" messages to wait for
>> "bad" messages to exhaust their retries.
>>
>> If I would set concurrentConsumers=2000 that can potentially spawn 2000
>> threads ?
>>
>> I tried a solution by sending bad messages to another, "waiting" route.
>> But, although good messages are not blocked any more, I still have
>> similar issue in the "waiting" route where I want to use delay. It can
>> only delay number of messages that is equal to concurrentConsumers. So,
>> "bad" messages at the end of the wait queue would wait, not delay() but
>> (n/concurrentConsumers) * delay() amount of time.
>>
>>
>>
>> On Sat, 2010-04-03 at 10:10 +0800, Willem Jiang wrote:
>>> Hi,
>>>
>>> Can you try to add this option "concurrentConsumers=5" into the seda
>>> endpoint's URI?
>>> By default there is only one thread to consumer the message in the
>>> queue.
>>>
>>> Willem
>>>
>>> Dragisa Krsmanovic wrote:
>>> > I have a route that is configured to re-deliver messages on exception.
>>> > If there are messages in that queue and one of them fails, all other
>>> > messages are blocked until all re-delivery attempts are exhausted.
>>> >
>>> > I would like other messages to go through while the ones that failed
>>> are
>>> > waiting for their re-delivery. Message order is not important. What is
>>> a
>>> > good way to do this ?
>>> >
>>> >
>>> > The route I tested with looks like this:
>>> >
>>> > from("seda:start")
>>> >  .onException(Exception.class)
>>> >    .handled(true)
>>> >    .redeliverDelay(100)
>>> >    .maximumRedeliveries(1)
>>> >    .to("mock:error")
>>> >  .end()
>>> >  .process(new TestProcessor())
>>> >  .to("mock:end");
>>> >
>>> >
>>> >
>>> >
>>>
>>
>>
>> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>> This email is confidential to the intended recipient. If you have
>> received it in error, please notify the sender and delete it from your
>> system. Any unauthorized use, disclosure or copying is not permitted. The
>> views or opinions presented are solely those of the sender and do
>> not necessarily represent those of Public Library of Science unless
>> otherwise specifically stated. Please note that neither Public Library
>> of Science nor any of its agents accept any responsibility for any
>> viruses that may be contained in this e-mail or its attachments and it
>> is your responsibility to scan the e-mail and attachments (if any).
>>
> 
> 
> 
> -- 
> Claus Ibsen
> Apache Camel Committer
> 
> Author of Camel in Action: http://www.manning.com/ibsen/
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
> 
> 

-- 
View this message in context: http://old.nabble.com/Message-blocks-route-until-all-redelivery-attempts-are-exhausted-tp28123035p28667141.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Message blocks route until all redelivery attempts are exhausted

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

It depends on the situation but sometimes you could simply let it fail
and rollback to the source.

But what you are looking for is currently not implemented in Camel.
Having its RedeliveryErrorHandler
support non blocking delays.

This is something we will add in the future. I was hoping I got a
chance to work a bit on the internals
to make the Channel concept more flexible and to support having a
Queue which would allow you to send
the Exchange to that said queue and have the Channel poll the
Exchanges from the Queue.
Then all is needed is to use a DelayedQueue which then have a delay
value for the redelivery exchanges.

The same concept could be implemented directly in
RedeliveryErrorHandler to get this implemented quicker.

The caveat when doing this is that its a new thread which will
re-process the Exchange and thus you cannot do this
if you have transactions enabled.

I will create a ticket in JIRA for this new feature.


On Mon, Apr 5, 2010 at 7:22 PM, Dragisa Krsmanovic <dk...@plos.org> wrote:
> Yes, but this only increases number of messages that are needed to
> "block" the route. What if I have thousands of messages and somewhere
> between 1 and 1000 can fail ? I don't want "good" messages to wait for
> "bad" messages to exhaust their retries.
>
> If I would set concurrentConsumers=2000 that can potentially spawn 2000
> threads ?
>
> I tried a solution by sending bad messages to another, "waiting" route.
> But, although good messages are not blocked any more, I still have
> similar issue in the "waiting" route where I want to use delay. It can
> only delay number of messages that is equal to concurrentConsumers. So,
> "bad" messages at the end of the wait queue would wait, not delay() but
> (n/concurrentConsumers) * delay() amount of time.
>
>
>
> On Sat, 2010-04-03 at 10:10 +0800, Willem Jiang wrote:
>> Hi,
>>
>> Can you try to add this option "concurrentConsumers=5" into the seda
>> endpoint's URI?
>> By default there is only one thread to consumer the message in the queue.
>>
>> Willem
>>
>> Dragisa Krsmanovic wrote:
>> > I have a route that is configured to re-deliver messages on exception.
>> > If there are messages in that queue and one of them fails, all other
>> > messages are blocked until all re-delivery attempts are exhausted.
>> >
>> > I would like other messages to go through while the ones that failed are
>> > waiting for their re-delivery. Message order is not important. What is a
>> > good way to do this ?
>> >
>> >
>> > The route I tested with looks like this:
>> >
>> > from("seda:start")
>> >  .onException(Exception.class)
>> >    .handled(true)
>> >    .redeliverDelay(100)
>> >    .maximumRedeliveries(1)
>> >    .to("mock:error")
>> >  .end()
>> >  .process(new TestProcessor())
>> >  .to("mock:end");
>> >
>> >
>> >
>> >
>>
>
>
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> This email is confidential to the intended recipient. If you have received it in error, please notify the sender and delete it from your
> system. Any unauthorized use, disclosure or copying is not permitted. The views or opinions presented are solely those of the sender and do
> not necessarily represent those of Public Library of Science unless otherwise specifically stated. Please note that neither Public Library
> of Science nor any of its agents accept any responsibility for any viruses that may be contained in this e-mail or its attachments and it
> is your responsibility to scan the e-mail and attachments (if any).
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: Message blocks route until all redelivery attempts are exhausted

Posted by Dragisa Krsmanovic <dk...@plos.org>.
Yes, but this only increases number of messages that are needed to
"block" the route. What if I have thousands of messages and somewhere
between 1 and 1000 can fail ? I don't want "good" messages to wait for
"bad" messages to exhaust their retries.

If I would set concurrentConsumers=2000 that can potentially spawn 2000
threads ? 

I tried a solution by sending bad messages to another, "waiting" route.
But, although good messages are not blocked any more, I still have
similar issue in the "waiting" route where I want to use delay. It can
only delay number of messages that is equal to concurrentConsumers. So,
"bad" messages at the end of the wait queue would wait, not delay() but
(n/concurrentConsumers) * delay() amount of time.



On Sat, 2010-04-03 at 10:10 +0800, Willem Jiang wrote: 
> Hi,
> 
> Can you try to add this option "concurrentConsumers=5" into the seda 
> endpoint's URI?
> By default there is only one thread to consumer the message in the queue.
> 
> Willem
> 
> Dragisa Krsmanovic wrote:
> > I have a route that is configured to re-deliver messages on exception.
> > If there are messages in that queue and one of them fails, all other
> > messages are blocked until all re-delivery attempts are exhausted.
> > 
> > I would like other messages to go through while the ones that failed are
> > waiting for their re-delivery. Message order is not important. What is a
> > good way to do this ?
> > 
> > 
> > The route I tested with looks like this:
> > 
> > from("seda:start")
> >  .onException(Exception.class)
> >    .handled(true)
> >    .redeliverDelay(100)
> >    .maximumRedeliveries(1)
> >    .to("mock:error")
> >  .end()
> >  .process(new TestProcessor())
> >  .to("mock:end");
> > 
> > 
> > 
> > 
> 


--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
This email is confidential to the intended recipient. If you have received it in error, please notify the sender and delete it from your
system. Any unauthorized use, disclosure or copying is not permitted. The views or opinions presented are solely those of the sender and do
not necessarily represent those of Public Library of Science unless otherwise specifically stated. Please note that neither Public Library
of Science nor any of its agents accept any responsibility for any viruses that may be contained in this e-mail or its attachments and it
is your responsibility to scan the e-mail and attachments (if any).

Re: Message blocks route until all redelivery attempts are exhausted

Posted by Willem Jiang <wi...@gmail.com>.
Hi,

Can you try to add this option "concurrentConsumers=5" into the seda 
endpoint's URI?
By default there is only one thread to consumer the message in the queue.

Willem

Dragisa Krsmanovic wrote:
> I have a route that is configured to re-deliver messages on exception.
> If there are messages in that queue and one of them fails, all other
> messages are blocked until all re-delivery attempts are exhausted.
> 
> I would like other messages to go through while the ones that failed are
> waiting for their re-delivery. Message order is not important. What is a
> good way to do this ?
> 
> 
> The route I tested with looks like this:
> 
> from("seda:start")
>  .onException(Exception.class)
>    .handled(true)
>    .redeliverDelay(100)
>    .maximumRedeliveries(1)
>    .to("mock:error")
>  .end()
>  .process(new TestProcessor())
>  .to("mock:end");
> 
> 
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> This email is confidential to the intended recipient. If you have received it in error, please notify the sender and delete it from your
> system. Any unauthorized use, disclosure or copying is not permitted. The views or opinions presented are solely those of the sender and do
> not necessarily represent those of Public Library of Science unless otherwise specifically stated. Please note that neither Public Library
> of Science nor any of its agents accept any responsibility for any viruses that may be contained in this e-mail or its attachments and it
> is your responsibility to scan the e-mail and attachments (if any).
>