You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Zhemzhitsky Sergey <Se...@troika.ru> on 2011/12/09 07:45:13 UTC

Is asyncDelayedRedelivery really async?

Hi guys,

Could you please explain how asyncDelayedRedelivery of redelivery policies really works, because I have not to block the route when some exchange fails and it should be redelivered. So I’m a little bit confused of how to use async. redelivery properly with different types of endpoints.

I have a route that that have timer endpoint as its source. Events are fired every second.
I also configured executor service with 10 threads for my error handler.

The 1st  and the 4th sent messages are marked to be always redelivered.
So I expected the 6th sent message (4th arrived) to be arrived no later than 2 seconds (time fires every second) after the 3rd sent message (2nd arrived), because the redelivery is asynchronous. However my test fails because the redelivery of the 1st message happens synchronously and the 6th sent message arrives about 6 seconds  after the 3rd sent message.

I also tried to use seda endpoint. It seems to work fine until I set waitForTaskToComplete parameter to Always. In that case the redelivery is synchronous too.

With quartz endpoints everything depends on the size of the quartz thread pool, so if its size is more than 1 and redelivery is synchronous quartz continues to fire events until thread pool is exhausted. With async. redelivery the behavior is the same except that the thread that is used  for redelivery differs from the calling thread.

From all described above I understood that calling thread always waits for the exchange to complete even if the redelivery is should happen asynchronously.


// BELOW IS MY UNIT TEST

package org.foo.bar;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.naming.Context;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.quartz.QuartzComponent;
import org.apache.camel.processor.RedeliveryPolicy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;

public class AsyncDelayedRedeliveryTest extends CamelTestSupport {

    private static final int MAXIMUM_REDELIVERIES = 1;

    @Test
    public void asyncRedeliveryTimer() throws Exception {
        context().addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                errorHandler(defaultErrorHandler().executorServiceRef("executorService"));

                onException(Exception.class)
                    .redeliveryPolicyRef("redeliveryPolicy")
                    .handled(true)
                    .onRedelivery(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            System.out.println("Redelivered : " + exchange.getIn().getBody() + " : " +Thread.currentThread().getName());
                        }
                    })
                    .to("mock:exception");

                from("timer:start?repeatCount=10&period=1000")
//                from("quartz:start?trigger.repeatCount=9&trigger.repeatInterval=1000&trigger.misfireInstruction=2")
//                    .to("seda:next?waitForTaskToComplete=Never&timeout=100");

//                from("seda:next?concurrentConsumers=2&size=2")
                    .process(new Processor() {
                        private AtomicInteger counter = new AtomicInteger();

                        @Override
                        public void process(Exchange exchange) throws Exception {
                            if (counter.compareAndSet(0, 1) || counter.compareAndSet(3, 4)) {
                                exchange.setProperty("ThrowException", Boolean.TRUE);
                                exchange.getIn().setBody(counter.get() - 1);
                            } else {
                                exchange.getIn().setBody(counter.getAndIncrement());
                            }
                        }
                    })
                    .process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            if(Boolean.TRUE.equals(exchange.getProperty("ThrowException", Boolean.class))) {
                                throw new RuntimeException("Test Exception!");
                            }
                        }
                    })
                    .to("mock:result");
            }
        });

        MockEndpoint result = getMockEndpoint("mock:result");
        result.whenAnyExchangeReceived(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                System.out.println("Message :     " + exchange.getIn().getBody() + " : " +Thread.currentThread().getName());
            }
        });
        result.expectedMessageCount(8);
        result.allMessages().property("ThrowException").isNull();

        // the 1th and 4th sent messages are always redelivered,
        // so the 4th delivered message is the 6th sent message
        result.message(3).body().isEqualTo(5);
        // as we're trying to use async redelivery we're expecting that
        // the 6th sent message (4th arrived) must arrive no later than 3rd send message (2nd arrived)
        result.message(3).arrives().noLaterThan(2).seconds().afterPrevious();
        // the same as the prev. assertion
        result.message(2).arrives().noLaterThan(2).seconds().beforeNext();

        MockEndpoint exception = getMockEndpoint("mock:exception");
        exception.expectedMessageCount(2);
        exception.allMessages().property("ThrowException").isNotNull();

        startCamelContext();

        assertMockEndpointsSatisfied(20, TimeUnit.SECONDS);
    }

    @Override
    protected Context createJndiContext() throws Exception {
        Context jndiContext = super.createJndiContext();

        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setAsyncDelayedRedelivery(false);
        redeliveryPolicy.setLogRetryAttempted(true);
        redeliveryPolicy.setLogExhausted(false);
        redeliveryPolicy.setMaximumRedeliveries(MAXIMUM_REDELIVERIES);
        redeliveryPolicy.setRedeliveryDelay(5000);
        jndiContext.bind("redeliveryPolicy", redeliveryPolicy);

        Properties props = new Properties();
        props.setProperty("org.quartz.scheduler.instanceName", "DefaultQuartzScheduler");
        props.setProperty("org.quartz.scheduler.rmi.export", "false");
        props.setProperty("org.quartz.scheduler.rmi.proxy", "false");
        props.setProperty("org.quartz.scheduler.wrapJobExecutionInUserTransaction", "false");
        props.setProperty("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
        props.setProperty("org.quartz.threadPool.threadCount", "5");
        props.setProperty("org.quartz.threadPool.threadPriority", "5");
        props.setProperty("org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread", "true");
        props.setProperty("org.quartz.jobStore.misfireThreshold", "1");
        props.setProperty("org.quartz.jobStore.class", "org.quartz.simpl.RAMJobStore");
        props.setProperty("org.quartz.scheduler.skipUpdateCheck", "true");

        QuartzComponent quartz = new QuartzComponent();
        quartz.setProperties(props);
        jndiContext.bind("quartz", quartz);

        ExecutorService executors = Executors.newScheduledThreadPool(10);
        jndiContext.bind("executorService", executors);

        return jndiContext;
    }

    @Override
    public boolean isUseRouteBuilder() {
        return false;
    }

}

Best Regards,
Sergey

_______________________________________________________

The information contained in this message may be privileged and conf idential and protected from disclosure. If you are not the original intended recipient, you are hereby notified that any review, retransmission, dissemination, or other use of, or taking of any action in reliance upon, this information is prohibited. If you have received this communication in error, please notify the sender immediately by replying to this message and delete it from your computer. Thank you for your cooperation. Troika Dialog, Russia. 
If you need assistance please contact our Contact Center  (+7495) 258 0500 or go to www.troika.ru/eng/Contacts/system.wbp  


Re: Is asyncDelayedRedelivery really async?

Posted by Claus Ibsen <cl...@gmail.com>.
On Tue, Jan 24, 2012 at 8:22 PM, Masters, Bill
<Bi...@centurylink.com> wrote:
> Hi Claus.
>
> I looked at http://camel.apache.org/asynchronous-routing-engine.html.
> I'm trying to implement a route that reads a file and transfers it via SFTP.
> If the destination is down for maintenance, I'd like to retry the transfer later (say 15 minutes) up to 4 times but without blocking the threads.
> It looks like SFTP is not on the list of components that support asynch routing engine. True?
> If so, what is the right way to implement asyncDelayedRedelivery?
>

That doesn't matter for producers whether they support natively async
or not, when you are doing redeliveries.
As Camel is doing this. So from the producer point of view, its just
another processing, that happens 15 minutes later.

So you should be fine. If you configure the Camel error handler to use
async delayed, then no threads is blocked.


> Thanks
>
> Bill Masters
> Centurylink
>
>
> -----Original Message-----
> From: Claus Ibsen [mailto:claus.ibsen@gmail.com]
> Sent: Friday, December 09, 2011 5:27 AM
> To: users@camel.apache.org
> Subject: Re: Is asyncDelayedRedelivery really async?
>
> Yes its async, as it uses a scheduled thread pool, to schedule a time in the future, to wake up, and execute the redelivery.
>
> However as its using the async routing engine, then the components in play (eg most often the consumer) need to support async routing engine.
> You can see a list of supported here
> http://camel.apache.org/asynchronous-routing-engine.html
> And you can argue that seda support that as well.
>
> Some components do not support async routing by nature, and so the thread will block in the consumer, until the exchange is done.
>
>
> 2011/12/9 Zhemzhitsky Sergey <Se...@troika.ru>:
>> Hi guys,
>>
>> Could you please explain how asyncDelayedRedelivery of redelivery policies really works, because I have not to block the route when some exchange fails and it should be redelivered. So I'm a little bit confused of how to use async. redelivery properly with different types of endpoints.
>>
>> I have a route that that have timer endpoint as its source. Events are fired every second.
>> I also configured executor service with 10 threads for my error handler.
>>
>> The 1st  and the 4th sent messages are marked to be always redelivered.
>> So I expected the 6th sent message (4th arrived) to be arrived no later than 2 seconds (time fires every second) after the 3rd sent message (2nd arrived), because the redelivery is asynchronous. However my test fails because the redelivery of the 1st message happens synchronously and the 6th sent message arrives about 6 seconds  after the 3rd sent message.
>>
>> I also tried to use seda endpoint. It seems to work fine until I set waitForTaskToComplete parameter to Always. In that case the redelivery is synchronous too.
>>
>> With quartz endpoints everything depends on the size of the quartz thread pool, so if its size is more than 1 and redelivery is synchronous quartz continues to fire events until thread pool is exhausted. With async. redelivery the behavior is the same except that the thread that is used  for redelivery differs from the calling thread.
>>
>> From all described above I understood that calling thread always waits for the exchange to complete even if the redelivery is should happen asynchronously.
>>
>>
>> // BELOW IS MY UNIT TEST
>>
>> package org.foo.bar;
>>
>> import java.util.Properties;
>> import java.util.concurrent.ExecutorService;
>> import java.util.concurrent.Executors; import
>> java.util.concurrent.TimeUnit; import
>> java.util.concurrent.atomic.AtomicInteger;
>>
>> import javax.naming.Context;
>>
>> import org.apache.camel.Exchange;
>> import org.apache.camel.Processor;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.camel.component.mock.MockEndpoint;
>> import org.apache.camel.component.quartz.QuartzComponent;
>> import org.apache.camel.processor.RedeliveryPolicy;
>> import org.apache.camel.test.junit4.CamelTestSupport;
>> import org.junit.Test;
>>
>> public class AsyncDelayedRedeliveryTest extends CamelTestSupport {
>>
>>    private static final int MAXIMUM_REDELIVERIES = 1;
>>
>>    @Test
>>    public void asyncRedeliveryTimer() throws Exception {
>>        context().addRoutes(new RouteBuilder() {
>>            @Override
>>            public void configure() throws Exception {
>>
>> errorHandler(defaultErrorHandler().executorServiceRef("executorService
>> "));
>>
>>                onException(Exception.class)
>>                    .redeliveryPolicyRef("redeliveryPolicy")
>>                    .handled(true)
>>                    .onRedelivery(new Processor() {
>>                        @Override
>>                        public void process(Exchange exchange) throws
>> Exception {
>>                            System.out.println("Redelivered : " +
>> exchange.getIn().getBody() + " : " +Thread.currentThread().getName());
>>                        }
>>                    })
>>                    .to("mock:exception");
>>
>>                from("timer:start?repeatCount=10&period=1000")
>> //
>> from("quartz:start?trigger.repeatCount=9&trigger.repeatInterval=1000&t
>> rigger.misfireInstruction=2") //
>> .to("seda:next?waitForTaskToComplete=Never&timeout=100");
>>
>> //                from("seda:next?concurrentConsumers=2&size=2")
>>                    .process(new Processor() {
>>                        private AtomicInteger counter = new
>> AtomicInteger();
>>
>>                        @Override
>>                        public void process(Exchange exchange) throws
>> Exception {
>>                            if (counter.compareAndSet(0, 1) ||
>> counter.compareAndSet(3, 4)) {
>>                                exchange.setProperty("ThrowException",
>> Boolean.TRUE);
>>                                exchange.getIn().setBody(counter.get()
>> - 1);
>>                            } else {
>>
>> exchange.getIn().setBody(counter.getAndIncrement());
>>                            }
>>                        }
>>                    })
>>                    .process(new Processor() {
>>                        @Override
>>                        public void process(Exchange exchange) throws
>> Exception {
>>
>> if(Boolean.TRUE.equals(exchange.getProperty("ThrowException",
>> Boolean.class))) {
>>                                throw new RuntimeException("Test
>> Exception!");
>>                            }
>>                        }
>>                    })
>>                    .to("mock:result");
>>            }
>>        });
>>
>>        MockEndpoint result = getMockEndpoint("mock:result");
>>        result.whenAnyExchangeReceived(new Processor() {
>>            @Override
>>            public void process(Exchange exchange) throws Exception {
>>                System.out.println("Message :     " +
>> exchange.getIn().getBody() + " : " +Thread.currentThread().getName());
>>            }
>>        });
>>        result.expectedMessageCount(8);
>>        result.allMessages().property("ThrowException").isNull();
>>
>>        // the 1th and 4th sent messages are always redelivered,
>>        // so the 4th delivered message is the 6th sent message
>>        result.message(3).body().isEqualTo(5);
>>        // as we're trying to use async redelivery we're expecting that
>>        // the 6th sent message (4th arrived) must arrive no later than
>> 3rd send message (2nd arrived)
>>
>> result.message(3).arrives().noLaterThan(2).seconds().afterPrevious();
>>        // the same as the prev. assertion
>>
>> result.message(2).arrives().noLaterThan(2).seconds().beforeNext();
>>
>>        MockEndpoint exception = getMockEndpoint("mock:exception");
>>        exception.expectedMessageCount(2);
>>        exception.allMessages().property("ThrowException").isNotNull();
>>
>>        startCamelContext();
>>
>>        assertMockEndpointsSatisfied(20, TimeUnit.SECONDS);
>>    }
>>
>>    @Override
>>    protected Context createJndiContext() throws Exception {
>>        Context jndiContext = super.createJndiContext();
>>
>>        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
>>        redeliveryPolicy.setAsyncDelayedRedelivery(false);
>>        redeliveryPolicy.setLogRetryAttempted(true);
>>        redeliveryPolicy.setLogExhausted(false);
>>        redeliveryPolicy.setMaximumRedeliveries(MAXIMUM_REDELIVERIES);
>>        redeliveryPolicy.setRedeliveryDelay(5000);
>>        jndiContext.bind("redeliveryPolicy", redeliveryPolicy);
>>
>>        Properties props = new Properties();
>>        props.setProperty("org.quartz.scheduler.instanceName",
>> "DefaultQuartzScheduler");
>>        props.setProperty("org.quartz.scheduler.rmi.export", "false");
>>        props.setProperty("org.quartz.scheduler.rmi.proxy", "false");
>>
>> props.setProperty("org.quartz.scheduler.wrapJobExecutionInUserTransact
>> ion", "false");
>>        props.setProperty("org.quartz.threadPool.class",
>> "org.quartz.simpl.SimpleThreadPool");
>>        props.setProperty("org.quartz.threadPool.threadCount", "5");
>>        props.setProperty("org.quartz.threadPool.threadPriority", "5");
>>
>> props.setProperty("org.quartz.threadPool.threadsInheritContextClassLoa
>> derOfInitializingThread", "true");
>>        props.setProperty("org.quartz.jobStore.misfireThreshold", "1");
>>        props.setProperty("org.quartz.jobStore.class",
>> "org.quartz.simpl.RAMJobStore");
>>        props.setProperty("org.quartz.scheduler.skipUpdateCheck",
>> "true");
>>
>>        QuartzComponent quartz = new QuartzComponent();
>>        quartz.setProperties(props);
>>        jndiContext.bind("quartz", quartz);
>>
>>        ExecutorService executors =
>> Executors.newScheduledThreadPool(10);
>>        jndiContext.bind("executorService", executors);
>>
>>        return jndiContext;
>>    }
>>
>>    @Override
>>    public boolean isUseRouteBuilder() {
>>        return false;
>>    }
>>
>> }
>>
>> Best Regards,
>> Sergey
>>
>> _______________________________________________________
>>
>> The information contained in this message may be privileged and conf idential and protected from disclosure. If you are not the original intended recipient, you are hereby notified that any review, retransmission, dissemination, or other use of, or taking of any action in reliance upon, this information is prohibited. If you have received this communication in error, please notify the sender immediately by replying to this message and delete it from your computer. Thank you for your cooperation. Troika Dialog, Russia.
>> If you need assistance please contact our Contact Center  (+7495) 258
>> 0500 or go to www.troika.ru/eng/Contacts/system.wbp
>>
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cibsen@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/
>
>
> This communication is the property of CenturyLink and may contain confidential or privileged information. Unauthorized use of this communication is strictly
> prohibited and may be unlawful.  If you have received this communication
> in error, please immediately notify the sender by reply e-mail and destroy
> all copies of the communication and any attachments.



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

RE: Is asyncDelayedRedelivery really async?

Posted by "Masters, Bill" <Bi...@CenturyLink.com>.
Hi Claus.

I looked at http://camel.apache.org/asynchronous-routing-engine.html.
I'm trying to implement a route that reads a file and transfers it via SFTP.
If the destination is down for maintenance, I'd like to retry the transfer later (say 15 minutes) up to 4 times but without blocking the threads.
It looks like SFTP is not on the list of components that support asynch routing engine. True?
If so, what is the right way to implement asyncDelayedRedelivery?

Thanks

Bill Masters
Centurylink


-----Original Message-----
From: Claus Ibsen [mailto:claus.ibsen@gmail.com]
Sent: Friday, December 09, 2011 5:27 AM
To: users@camel.apache.org
Subject: Re: Is asyncDelayedRedelivery really async?

Yes its async, as it uses a scheduled thread pool, to schedule a time in the future, to wake up, and execute the redelivery.

However as its using the async routing engine, then the components in play (eg most often the consumer) need to support async routing engine.
You can see a list of supported here
http://camel.apache.org/asynchronous-routing-engine.html
And you can argue that seda support that as well.

Some components do not support async routing by nature, and so the thread will block in the consumer, until the exchange is done.


2011/12/9 Zhemzhitsky Sergey <Se...@troika.ru>:
> Hi guys,
>
> Could you please explain how asyncDelayedRedelivery of redelivery policies really works, because I have not to block the route when some exchange fails and it should be redelivered. So I'm a little bit confused of how to use async. redelivery properly with different types of endpoints.
>
> I have a route that that have timer endpoint as its source. Events are fired every second.
> I also configured executor service with 10 threads for my error handler.
>
> The 1st  and the 4th sent messages are marked to be always redelivered.
> So I expected the 6th sent message (4th arrived) to be arrived no later than 2 seconds (time fires every second) after the 3rd sent message (2nd arrived), because the redelivery is asynchronous. However my test fails because the redelivery of the 1st message happens synchronously and the 6th sent message arrives about 6 seconds  after the 3rd sent message.
>
> I also tried to use seda endpoint. It seems to work fine until I set waitForTaskToComplete parameter to Always. In that case the redelivery is synchronous too.
>
> With quartz endpoints everything depends on the size of the quartz thread pool, so if its size is more than 1 and redelivery is synchronous quartz continues to fire events until thread pool is exhausted. With async. redelivery the behavior is the same except that the thread that is used  for redelivery differs from the calling thread.
>
> From all described above I understood that calling thread always waits for the exchange to complete even if the redelivery is should happen asynchronously.
>
>
> // BELOW IS MY UNIT TEST
>
> package org.foo.bar;
>
> import java.util.Properties;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors; import
> java.util.concurrent.TimeUnit; import
> java.util.concurrent.atomic.AtomicInteger;
>
> import javax.naming.Context;
>
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.component.quartz.QuartzComponent;
> import org.apache.camel.processor.RedeliveryPolicy;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.junit.Test;
>
> public class AsyncDelayedRedeliveryTest extends CamelTestSupport {
>
>    private static final int MAXIMUM_REDELIVERIES = 1;
>
>    @Test
>    public void asyncRedeliveryTimer() throws Exception {
>        context().addRoutes(new RouteBuilder() {
>            @Override
>            public void configure() throws Exception {
>
> errorHandler(defaultErrorHandler().executorServiceRef("executorService
> "));
>
>                onException(Exception.class)
>                    .redeliveryPolicyRef("redeliveryPolicy")
>                    .handled(true)
>                    .onRedelivery(new Processor() {
>                        @Override
>                        public void process(Exchange exchange) throws
> Exception {
>                            System.out.println("Redelivered : " +
> exchange.getIn().getBody() + " : " +Thread.currentThread().getName());
>                        }
>                    })
>                    .to("mock:exception");
>
>                from("timer:start?repeatCount=10&period=1000")
> //
> from("quartz:start?trigger.repeatCount=9&trigger.repeatInterval=1000&t
> rigger.misfireInstruction=2") //
> .to("seda:next?waitForTaskToComplete=Never&timeout=100");
>
> //                from("seda:next?concurrentConsumers=2&size=2")
>                    .process(new Processor() {
>                        private AtomicInteger counter = new
> AtomicInteger();
>
>                        @Override
>                        public void process(Exchange exchange) throws
> Exception {
>                            if (counter.compareAndSet(0, 1) ||
> counter.compareAndSet(3, 4)) {
>                                exchange.setProperty("ThrowException",
> Boolean.TRUE);
>                                exchange.getIn().setBody(counter.get()
> - 1);
>                            } else {
>
> exchange.getIn().setBody(counter.getAndIncrement());
>                            }
>                        }
>                    })
>                    .process(new Processor() {
>                        @Override
>                        public void process(Exchange exchange) throws
> Exception {
>
> if(Boolean.TRUE.equals(exchange.getProperty("ThrowException",
> Boolean.class))) {
>                                throw new RuntimeException("Test
> Exception!");
>                            }
>                        }
>                    })
>                    .to("mock:result");
>            }
>        });
>
>        MockEndpoint result = getMockEndpoint("mock:result");
>        result.whenAnyExchangeReceived(new Processor() {
>            @Override
>            public void process(Exchange exchange) throws Exception {
>                System.out.println("Message :     " +
> exchange.getIn().getBody() + " : " +Thread.currentThread().getName());
>            }
>        });
>        result.expectedMessageCount(8);
>        result.allMessages().property("ThrowException").isNull();
>
>        // the 1th and 4th sent messages are always redelivered,
>        // so the 4th delivered message is the 6th sent message
>        result.message(3).body().isEqualTo(5);
>        // as we're trying to use async redelivery we're expecting that
>        // the 6th sent message (4th arrived) must arrive no later than
> 3rd send message (2nd arrived)
>
> result.message(3).arrives().noLaterThan(2).seconds().afterPrevious();
>        // the same as the prev. assertion
>
> result.message(2).arrives().noLaterThan(2).seconds().beforeNext();
>
>        MockEndpoint exception = getMockEndpoint("mock:exception");
>        exception.expectedMessageCount(2);
>        exception.allMessages().property("ThrowException").isNotNull();
>
>        startCamelContext();
>
>        assertMockEndpointsSatisfied(20, TimeUnit.SECONDS);
>    }
>
>    @Override
>    protected Context createJndiContext() throws Exception {
>        Context jndiContext = super.createJndiContext();
>
>        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
>        redeliveryPolicy.setAsyncDelayedRedelivery(false);
>        redeliveryPolicy.setLogRetryAttempted(true);
>        redeliveryPolicy.setLogExhausted(false);
>        redeliveryPolicy.setMaximumRedeliveries(MAXIMUM_REDELIVERIES);
>        redeliveryPolicy.setRedeliveryDelay(5000);
>        jndiContext.bind("redeliveryPolicy", redeliveryPolicy);
>
>        Properties props = new Properties();
>        props.setProperty("org.quartz.scheduler.instanceName",
> "DefaultQuartzScheduler");
>        props.setProperty("org.quartz.scheduler.rmi.export", "false");
>        props.setProperty("org.quartz.scheduler.rmi.proxy", "false");
>
> props.setProperty("org.quartz.scheduler.wrapJobExecutionInUserTransact
> ion", "false");
>        props.setProperty("org.quartz.threadPool.class",
> "org.quartz.simpl.SimpleThreadPool");
>        props.setProperty("org.quartz.threadPool.threadCount", "5");
>        props.setProperty("org.quartz.threadPool.threadPriority", "5");
>
> props.setProperty("org.quartz.threadPool.threadsInheritContextClassLoa
> derOfInitializingThread", "true");
>        props.setProperty("org.quartz.jobStore.misfireThreshold", "1");
>        props.setProperty("org.quartz.jobStore.class",
> "org.quartz.simpl.RAMJobStore");
>        props.setProperty("org.quartz.scheduler.skipUpdateCheck",
> "true");
>
>        QuartzComponent quartz = new QuartzComponent();
>        quartz.setProperties(props);
>        jndiContext.bind("quartz", quartz);
>
>        ExecutorService executors =
> Executors.newScheduledThreadPool(10);
>        jndiContext.bind("executorService", executors);
>
>        return jndiContext;
>    }
>
>    @Override
>    public boolean isUseRouteBuilder() {
>        return false;
>    }
>
> }
>
> Best Regards,
> Sergey
>
> _______________________________________________________
>
> The information contained in this message may be privileged and conf idential and protected from disclosure. If you are not the original intended recipient, you are hereby notified that any review, retransmission, dissemination, or other use of, or taking of any action in reliance upon, this information is prohibited. If you have received this communication in error, please notify the sender immediately by replying to this message and delete it from your computer. Thank you for your cooperation. Troika Dialog, Russia.
> If you need assistance please contact our Contact Center  (+7495) 258
> 0500 or go to www.troika.ru/eng/Contacts/system.wbp
>



--
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/


This communication is the property of CenturyLink and may contain confidential or privileged information. Unauthorized use of this communication is strictly
prohibited and may be unlawful.  If you have received this communication
in error, please immediately notify the sender by reply e-mail and destroy
all copies of the communication and any attachments.

RE: Is asyncDelayedRedelivery really async?

Posted by Zhemzhitsky Sergey <Se...@troika.ru>.
Hi Claus,

Thanks a lot for clarification.

Best Regards,
Sergey


-----Original Message-----
From: Claus Ibsen [mailto:claus.ibsen@gmail.com] 
Sent: Friday, December 09, 2011 4:27 PM
To: users@camel.apache.org
Subject: Re: Is asyncDelayedRedelivery really async?

Yes its async, as it uses a scheduled thread pool, to schedule a time in the future, to wake up, and execute the redelivery.

However as its using the async routing engine, then the components in play (eg most often the consumer) need to support async routing engine.
You can see a list of supported here
http://camel.apache.org/asynchronous-routing-engine.html
And you can argue that seda support that as well.

Some components do not support async routing by nature, and so the thread will block in the consumer, until the exchange is done.


2011/12/9 Zhemzhitsky Sergey <Se...@troika.ru>:
> Hi guys,
>
> Could you please explain how asyncDelayedRedelivery of redelivery policies really works, because I have not to block the route when some exchange fails and it should be redelivered. So I’m a little bit confused of how to use async. redelivery properly with different types of endpoints.
>
> I have a route that that have timer endpoint as its source. Events are fired every second.
> I also configured executor service with 10 threads for my error handler.
>
> The 1st  and the 4th sent messages are marked to be always redelivered.
> So I expected the 6th sent message (4th arrived) to be arrived no later than 2 seconds (time fires every second) after the 3rd sent message (2nd arrived), because the redelivery is asynchronous. However my test fails because the redelivery of the 1st message happens synchronously and the 6th sent message arrives about 6 seconds  after the 3rd sent message.
>
> I also tried to use seda endpoint. It seems to work fine until I set waitForTaskToComplete parameter to Always. In that case the redelivery is synchronous too.
>
> With quartz endpoints everything depends on the size of the quartz thread pool, so if its size is more than 1 and redelivery is synchronous quartz continues to fire events until thread pool is exhausted. With async. redelivery the behavior is the same except that the thread that is used  for redelivery differs from the calling thread.
>
> From all described above I understood that calling thread always waits for the exchange to complete even if the redelivery is should happen asynchronously.
>
>
> // BELOW IS MY UNIT TEST
>
> package org.foo.bar;
>
> import java.util.Properties;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors; import 
> java.util.concurrent.TimeUnit; import 
> java.util.concurrent.atomic.AtomicInteger;
>
> import javax.naming.Context;
>
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.component.quartz.QuartzComponent;
> import org.apache.camel.processor.RedeliveryPolicy;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.junit.Test;
>
> public class AsyncDelayedRedeliveryTest extends CamelTestSupport {
>
>    private static final int MAXIMUM_REDELIVERIES = 1;
>
>    @Test
>    public void asyncRedeliveryTimer() throws Exception {
>        context().addRoutes(new RouteBuilder() {
>            @Override
>            public void configure() throws Exception {
>                
> errorHandler(defaultErrorHandler().executorServiceRef("executorService
> "));
>
>                onException(Exception.class)
>                    .redeliveryPolicyRef("redeliveryPolicy")
>                    .handled(true)
>                    .onRedelivery(new Processor() {
>                        @Override
>                        public void process(Exchange exchange) throws 
> Exception {
>                            System.out.println("Redelivered : " + 
> exchange.getIn().getBody() + " : " +Thread.currentThread().getName());
>                        }
>                    })
>                    .to("mock:exception");
>
>                from("timer:start?repeatCount=10&period=1000")
> //                
> from("quartz:start?trigger.repeatCount=9&trigger.repeatInterval=1000&t
> rigger.misfireInstruction=2") //                    
> .to("seda:next?waitForTaskToComplete=Never&timeout=100");
>
> //                from("seda:next?concurrentConsumers=2&size=2")
>                    .process(new Processor() {
>                        private AtomicInteger counter = new 
> AtomicInteger();
>
>                        @Override
>                        public void process(Exchange exchange) throws 
> Exception {
>                            if (counter.compareAndSet(0, 1) || 
> counter.compareAndSet(3, 4)) {
>                                exchange.setProperty("ThrowException", 
> Boolean.TRUE);
>                                exchange.getIn().setBody(counter.get() 
> - 1);
>                            } else {
>                                
> exchange.getIn().setBody(counter.getAndIncrement());
>                            }
>                        }
>                    })
>                    .process(new Processor() {
>                        @Override
>                        public void process(Exchange exchange) throws 
> Exception {
>                            
> if(Boolean.TRUE.equals(exchange.getProperty("ThrowException", 
> Boolean.class))) {
>                                throw new RuntimeException("Test 
> Exception!");
>                            }
>                        }
>                    })
>                    .to("mock:result");
>            }
>        });
>
>        MockEndpoint result = getMockEndpoint("mock:result");
>        result.whenAnyExchangeReceived(new Processor() {
>            @Override
>            public void process(Exchange exchange) throws Exception {
>                System.out.println("Message :     " + 
> exchange.getIn().getBody() + " : " +Thread.currentThread().getName());
>            }
>        });
>        result.expectedMessageCount(8);
>        result.allMessages().property("ThrowException").isNull();
>
>        // the 1th and 4th sent messages are always redelivered,
>        // so the 4th delivered message is the 6th sent message
>        result.message(3).body().isEqualTo(5);
>        // as we're trying to use async redelivery we're expecting that
>        // the 6th sent message (4th arrived) must arrive no later than 
> 3rd send message (2nd arrived)
>        
> result.message(3).arrives().noLaterThan(2).seconds().afterPrevious();
>        // the same as the prev. assertion
>        
> result.message(2).arrives().noLaterThan(2).seconds().beforeNext();
>
>        MockEndpoint exception = getMockEndpoint("mock:exception");
>        exception.expectedMessageCount(2);
>        exception.allMessages().property("ThrowException").isNotNull();
>
>        startCamelContext();
>
>        assertMockEndpointsSatisfied(20, TimeUnit.SECONDS);
>    }
>
>    @Override
>    protected Context createJndiContext() throws Exception {
>        Context jndiContext = super.createJndiContext();
>
>        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
>        redeliveryPolicy.setAsyncDelayedRedelivery(false);
>        redeliveryPolicy.setLogRetryAttempted(true);
>        redeliveryPolicy.setLogExhausted(false);
>        redeliveryPolicy.setMaximumRedeliveries(MAXIMUM_REDELIVERIES);
>        redeliveryPolicy.setRedeliveryDelay(5000);
>        jndiContext.bind("redeliveryPolicy", redeliveryPolicy);
>
>        Properties props = new Properties();
>        props.setProperty("org.quartz.scheduler.instanceName", 
> "DefaultQuartzScheduler");
>        props.setProperty("org.quartz.scheduler.rmi.export", "false");
>        props.setProperty("org.quartz.scheduler.rmi.proxy", "false");
>        
> props.setProperty("org.quartz.scheduler.wrapJobExecutionInUserTransact
> ion", "false");
>        props.setProperty("org.quartz.threadPool.class", 
> "org.quartz.simpl.SimpleThreadPool");
>        props.setProperty("org.quartz.threadPool.threadCount", "5");
>        props.setProperty("org.quartz.threadPool.threadPriority", "5");
>        
> props.setProperty("org.quartz.threadPool.threadsInheritContextClassLoa
> derOfInitializingThread", "true");
>        props.setProperty("org.quartz.jobStore.misfireThreshold", "1");
>        props.setProperty("org.quartz.jobStore.class", 
> "org.quartz.simpl.RAMJobStore");
>        props.setProperty("org.quartz.scheduler.skipUpdateCheck", 
> "true");
>
>        QuartzComponent quartz = new QuartzComponent();
>        quartz.setProperties(props);
>        jndiContext.bind("quartz", quartz);
>
>        ExecutorService executors = 
> Executors.newScheduledThreadPool(10);
>        jndiContext.bind("executorService", executors);
>
>        return jndiContext;
>    }
>
>    @Override
>    public boolean isUseRouteBuilder() {
>        return false;
>    }
>
> }
>
> Best Regards,
> Sergey
>
> _______________________________________________________
>
> The information contained in this message may be privileged and conf idential and protected from disclosure. If you are not the original intended recipient, you are hereby notified that any review, retransmission, dissemination, or other use of, or taking of any action in reliance upon, this information is prohibited. If you have received this communication in error, please notify the sender immediately by replying to this message and delete it from your computer. Thank you for your cooperation. Troika Dialog, Russia.
> If you need assistance please contact our Contact Center  (+7495) 258 
> 0500 or go to www.troika.ru/eng/Contacts/system.wbp
>



--
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: Is asyncDelayedRedelivery really async?

Posted by Claus Ibsen <cl...@gmail.com>.
Yes its async, as it uses a scheduled thread pool, to schedule a time
in the future, to wake up, and execute the redelivery.

However as its using the async routing engine, then the components in
play (eg most often the consumer) need to support async routing
engine.
You can see a list of supported here
http://camel.apache.org/asynchronous-routing-engine.html
And you can argue that seda support that as well.

Some components do not support async routing by nature, and so the
thread will block in the consumer, until the exchange is done.


2011/12/9 Zhemzhitsky Sergey <Se...@troika.ru>:
> Hi guys,
>
> Could you please explain how asyncDelayedRedelivery of redelivery policies really works, because I have not to block the route when some exchange fails and it should be redelivered. So I’m a little bit confused of how to use async. redelivery properly with different types of endpoints.
>
> I have a route that that have timer endpoint as its source. Events are fired every second.
> I also configured executor service with 10 threads for my error handler.
>
> The 1st  and the 4th sent messages are marked to be always redelivered.
> So I expected the 6th sent message (4th arrived) to be arrived no later than 2 seconds (time fires every second) after the 3rd sent message (2nd arrived), because the redelivery is asynchronous. However my test fails because the redelivery of the 1st message happens synchronously and the 6th sent message arrives about 6 seconds  after the 3rd sent message.
>
> I also tried to use seda endpoint. It seems to work fine until I set waitForTaskToComplete parameter to Always. In that case the redelivery is synchronous too.
>
> With quartz endpoints everything depends on the size of the quartz thread pool, so if its size is more than 1 and redelivery is synchronous quartz continues to fire events until thread pool is exhausted. With async. redelivery the behavior is the same except that the thread that is used  for redelivery differs from the calling thread.
>
> From all described above I understood that calling thread always waits for the exchange to complete even if the redelivery is should happen asynchronously.
>
>
> // BELOW IS MY UNIT TEST
>
> package org.foo.bar;
>
> import java.util.Properties;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.atomic.AtomicInteger;
>
> import javax.naming.Context;
>
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.component.quartz.QuartzComponent;
> import org.apache.camel.processor.RedeliveryPolicy;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.junit.Test;
>
> public class AsyncDelayedRedeliveryTest extends CamelTestSupport {
>
>    private static final int MAXIMUM_REDELIVERIES = 1;
>
>    @Test
>    public void asyncRedeliveryTimer() throws Exception {
>        context().addRoutes(new RouteBuilder() {
>            @Override
>            public void configure() throws Exception {
>                errorHandler(defaultErrorHandler().executorServiceRef("executorService"));
>
>                onException(Exception.class)
>                    .redeliveryPolicyRef("redeliveryPolicy")
>                    .handled(true)
>                    .onRedelivery(new Processor() {
>                        @Override
>                        public void process(Exchange exchange) throws Exception {
>                            System.out.println("Redelivered : " + exchange.getIn().getBody() + " : " +Thread.currentThread().getName());
>                        }
>                    })
>                    .to("mock:exception");
>
>                from("timer:start?repeatCount=10&period=1000")
> //                from("quartz:start?trigger.repeatCount=9&trigger.repeatInterval=1000&trigger.misfireInstruction=2")
> //                    .to("seda:next?waitForTaskToComplete=Never&timeout=100");
>
> //                from("seda:next?concurrentConsumers=2&size=2")
>                    .process(new Processor() {
>                        private AtomicInteger counter = new AtomicInteger();
>
>                        @Override
>                        public void process(Exchange exchange) throws Exception {
>                            if (counter.compareAndSet(0, 1) || counter.compareAndSet(3, 4)) {
>                                exchange.setProperty("ThrowException", Boolean.TRUE);
>                                exchange.getIn().setBody(counter.get() - 1);
>                            } else {
>                                exchange.getIn().setBody(counter.getAndIncrement());
>                            }
>                        }
>                    })
>                    .process(new Processor() {
>                        @Override
>                        public void process(Exchange exchange) throws Exception {
>                            if(Boolean.TRUE.equals(exchange.getProperty("ThrowException", Boolean.class))) {
>                                throw new RuntimeException("Test Exception!");
>                            }
>                        }
>                    })
>                    .to("mock:result");
>            }
>        });
>
>        MockEndpoint result = getMockEndpoint("mock:result");
>        result.whenAnyExchangeReceived(new Processor() {
>            @Override
>            public void process(Exchange exchange) throws Exception {
>                System.out.println("Message :     " + exchange.getIn().getBody() + " : " +Thread.currentThread().getName());
>            }
>        });
>        result.expectedMessageCount(8);
>        result.allMessages().property("ThrowException").isNull();
>
>        // the 1th and 4th sent messages are always redelivered,
>        // so the 4th delivered message is the 6th sent message
>        result.message(3).body().isEqualTo(5);
>        // as we're trying to use async redelivery we're expecting that
>        // the 6th sent message (4th arrived) must arrive no later than 3rd send message (2nd arrived)
>        result.message(3).arrives().noLaterThan(2).seconds().afterPrevious();
>        // the same as the prev. assertion
>        result.message(2).arrives().noLaterThan(2).seconds().beforeNext();
>
>        MockEndpoint exception = getMockEndpoint("mock:exception");
>        exception.expectedMessageCount(2);
>        exception.allMessages().property("ThrowException").isNotNull();
>
>        startCamelContext();
>
>        assertMockEndpointsSatisfied(20, TimeUnit.SECONDS);
>    }
>
>    @Override
>    protected Context createJndiContext() throws Exception {
>        Context jndiContext = super.createJndiContext();
>
>        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
>        redeliveryPolicy.setAsyncDelayedRedelivery(false);
>        redeliveryPolicy.setLogRetryAttempted(true);
>        redeliveryPolicy.setLogExhausted(false);
>        redeliveryPolicy.setMaximumRedeliveries(MAXIMUM_REDELIVERIES);
>        redeliveryPolicy.setRedeliveryDelay(5000);
>        jndiContext.bind("redeliveryPolicy", redeliveryPolicy);
>
>        Properties props = new Properties();
>        props.setProperty("org.quartz.scheduler.instanceName", "DefaultQuartzScheduler");
>        props.setProperty("org.quartz.scheduler.rmi.export", "false");
>        props.setProperty("org.quartz.scheduler.rmi.proxy", "false");
>        props.setProperty("org.quartz.scheduler.wrapJobExecutionInUserTransaction", "false");
>        props.setProperty("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
>        props.setProperty("org.quartz.threadPool.threadCount", "5");
>        props.setProperty("org.quartz.threadPool.threadPriority", "5");
>        props.setProperty("org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread", "true");
>        props.setProperty("org.quartz.jobStore.misfireThreshold", "1");
>        props.setProperty("org.quartz.jobStore.class", "org.quartz.simpl.RAMJobStore");
>        props.setProperty("org.quartz.scheduler.skipUpdateCheck", "true");
>
>        QuartzComponent quartz = new QuartzComponent();
>        quartz.setProperties(props);
>        jndiContext.bind("quartz", quartz);
>
>        ExecutorService executors = Executors.newScheduledThreadPool(10);
>        jndiContext.bind("executorService", executors);
>
>        return jndiContext;
>    }
>
>    @Override
>    public boolean isUseRouteBuilder() {
>        return false;
>    }
>
> }
>
> Best Regards,
> Sergey
>
> _______________________________________________________
>
> The information contained in this message may be privileged and conf idential and protected from disclosure. If you are not the original intended recipient, you are hereby notified that any review, retransmission, dissemination, or other use of, or taking of any action in reliance upon, this information is prohibited. If you have received this communication in error, please notify the sender immediately by replying to this message and delete it from your computer. Thank you for your cooperation. Troika Dialog, Russia.
> If you need assistance please contact our Contact Center  (+7495) 258 0500 or go to www.troika.ru/eng/Contacts/system.wbp
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/