You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Babak Vahdat <ba...@swissonline.ch> on 2013/05/03 14:52:07 UTC

Re: WireTap doesn't process the messages in the order they arrived for each message

Hi,

The reason of the behavior you're observing is simply because of a current
bug in Camel itself as given your 2 routes below, the current logic creates
TWO instances of ExecutorService and not as you would expect ONE
ExecutorService object (each being passed to each of the 2 WireTapProcessor
in your route). Looking at
ProcessorDefinitionHelper#lookupExecutorServiceRef() the current logic looks
up ONLY inside the registry causing 2 ExecutorService being returned by each
invocation of this method.

One simple current workaround to get that lovely green lines inside your IDE
for your test would be to bind "executorService" into the Camel Registry, in
your case ApplicationContextRegistry, that's:

  <bean id="executorService" class="java.util.concurrent.Executors"
factory-method="newSingleThreadExecutor" destroy-method="shutdownNow" />

Would you mind to raise a ticket for this?

Babak


Christian Mueller wrote
> The following test fails randomly fails at message 6 up to 961 (on my
> machine). I consider this as an bug:
> 
> public class WireTapTest extends CamelSpringTestSupport {
>     
>     private int counter = 10000;
> 
>     @Test
>     public void test() throws InterruptedException {
>         getMockEndpoint("mock:result").expectedMessageCount(counter * 2);
> 
>         template.setDefaultEndpointUri("direct:start");
>         for (int i = 0; i < counter; i++) {
>             template.requestBody("Camel");
>         }
> 
>         assertMockEndpointsSatisfied();
>         
>         List
> <Exchange>
>  receivedExchanges =
> getMockEndpoint("mock:result").getReceivedExchanges();
>         for (int i = 0; i < receivedExchanges.size(); i++) {
>             System.out.println("check exchange number " + i);
> 
>             String body =
> receivedExchanges.get(i).getIn().getBody(String.class);
>             if (i % 2 == 0) {
>                 assertEquals("REQUEST", body);
>             } else {
>                 assertEquals("RESPONSE", body);
>             }
>         }
>     }
> 
>     @Override
>     protected AbstractApplicationContext createApplicationContext() {
>         return new ClassPathXmlApplicationContext("bundle-context.xml");
>     }
> }
> 
> which is using the following route:
> public class WireTapRouteBuilder extends RouteBuilder {
> 
>     @Override
>     public void configure() throws Exception {
>         from("direct:start")
>             .wireTap("seda:wireTap").executorServiceRef("executorService")
>             .setHeader("TYPE", constant("RESPONSE"))
>            
> .wireTap("seda:wireTap").executorServiceRef("executorService");
> 
>         from("seda:wireTap")
>             .choice()
>                 .when(header("TYPE").isNull())
>                     .setBody(constant("REQUEST"))
>                     .to("mock:result")
>                 .otherwise()
>                     .setBody(constant("RESPONSE"))
>                     .to("mock:result")
>             .end();
>     }
> }
> 
> and the following configuration:
> <beans xmlns="http://www.springframework.org/schema/beans"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>     xmlns:camel="http://camel.apache.org/schema/spring"
>     xsi:schemaLocation="
>         http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd
>         http://camel.apache.org/schema/spring
> http://camel.apache.org/schema/spring/camel-spring.xsd">
>     
> <camel:camelContext id="com.xxx.xxxx.wireTapTest">
>         
> <camel:routeBuilder ref="wireTapRouteBuilder" />
>         
> <camel:threadPoolProfile id="executorService" defaultProfile="false"
> poolSize="1" maxPoolSize="1" maxQueueSize="10000"
> rejectedPolicy="Discard"/>
>     
> </camel:camelContext>
>     
> <bean id="wireTapRouteBuilder"
> class="com.xxx.xxxx.wiretap.WireTapRouteBuilder" />
> </beans>
> We use the wiretap to send messages to an endpoint which persist some key
> information about the execution in a database. It's important for us to
> persist the messages into the database in the same order as they arrive in
> the seda endpoint. Because of this, we configured the threadPoolProfile to
> only use one thread. But some times the messages receive in the reverse
> order in our database/mock endpoint.
> 
> Any suggestions what is wrong here?
> 
> Best,
> Christian





--
View this message in context: http://camel.465427.n5.nabble.com/WireTap-doesn-t-process-the-messages-in-the-order-they-arrived-for-each-message-tp5731733p5731969.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: WireTap doesn't process the messages in the order they arrived for each message

Posted by Babak Vahdat <ba...@swissonline.ch>.
Just for the record, Claus is ABSOLUTELY right with his solution as I simply
overlooked the typo (threadPoolProfile instead of threadPool tag) though the
proposed "solution" of mine would also do the trick :-)

Babak


Christian Mueller wrote
> Thank you guys for the response!
> 
> Best,
> Christian
> 
> 
> On Fri, May 3, 2013 at 3:38 PM, Babak Vahdat &lt;

> babak.vahdat@

> &gt;wrote:
> 
>> Yeah that was the cause of the behaviour threadPoolProfile instead of
>> threadProfile, sorry!
>>
>> Babak
>>
>> Am 03.05.2013 um 15:23 schrieb Claus Ibsen &lt;

> claus.ibsen@

> &gt;:
>>
>> > Babak well spotted. Though its not a bug.
>> >
>> > 
> <threadPoolProfile>
>  is a profile, (aka like a skeleton/template).
>> > Which you use to define options, when creating new thread pools.
>> >
>> > Use
>> > 
> <threadPool>
>> >
>> > if you want a single thread pool and share it between the 2 wire taps,
>> > then yeah Christian should use a 
> <threadPool>
>  instead.
>> >
>> > Mind that you can still define a 
> <threadPoolProfile>
>  and then a
>> > 
> <threadPool>
>  which can use the profile to define its basic options.
>> >
>> > See more details at
>> > http://camel.apache.org/threading-model.html
>> >
>> >
>> >
>> > On Fri, May 3, 2013 at 2:52 PM, Babak Vahdat
>> > &lt;

> babak.vahdat@

> &gt; wrote:
>> >> Hi,
>> >>
>> >> The reason of the behavior you're observing is simply because of a
>> current
>> >> bug in Camel itself as given your 2 routes below, the current logic
>> creates
>> >> TWO instances of ExecutorService and not as you would expect ONE
>> >> ExecutorService object (each being passed to each of the 2
>> WireTapProcessor
>> >> in your route). Looking at
>> >> ProcessorDefinitionHelper#lookupExecutorServiceRef() the current logic
>> looks
>> >> up ONLY inside the registry causing 2 ExecutorService being returned
>> by
>> each
>> >> invocation of this method.
>> >>
>> >> One simple current workaround to get that lovely green lines inside
>> your IDE
>> >> for your test would be to bind "executorService" into the Camel
>> Registry, in
>> >> your case ApplicationContextRegistry, that's:
>> >>
>> >>  
> <bean id="executorService" class="java.util.concurrent.Executors"
>>
>  >> factory-method="newSingleThreadExecutor" destroy-method="shutdownNow"
> />
>> >>
>> >> Would you mind to raise a ticket for this?
>> >>
>> >> Babak
>> >>
>> >>
>> >> Christian Mueller wrote
>> >>> The following test fails randomly fails at message 6 up to 961 (on my
>> >>> machine). I consider this as an bug:
>> >>>
>> >>> public class WireTapTest extends CamelSpringTestSupport {
>> >>>
>> >>>    private int counter = 10000;
>> >>>
>> >>>    @Test
>> >>>    public void test() throws InterruptedException {
>> >>>        getMockEndpoint("mock:result").expectedMessageCount(counter *
>> 2);
>> >>>
>> >>>        template.setDefaultEndpointUri("direct:start");
>> >>>        for (int i = 0; i < counter; i++) {
>> >>>            template.requestBody("Camel");
>> >>>        }
>> >>>
>> >>>        assertMockEndpointsSatisfied();
>> >>>
>> >>>        List
>> >>> 
> <Exchange>
>> >>> receivedExchanges =
>> >>> getMockEndpoint("mock:result").getReceivedExchanges();
>> >>>        for (int i = 0; i < receivedExchanges.size(); i++) {
>> >>>            System.out.println("check exchange number " + i);
>> >>>
>> >>>            String body =
>> >>> receivedExchanges.get(i).getIn().getBody(String.class);
>> >>>            if (i % 2 == 0) {
>> >>>                assertEquals("REQUEST", body);
>> >>>            } else {
>> >>>                assertEquals("RESPONSE", body);
>> >>>            }
>> >>>        }
>> >>>    }
>> >>>
>> >>>    @Override
>> >>>    protected AbstractApplicationContext createApplicationContext() {
>> >>>        return new
>> ClassPathXmlApplicationContext("bundle-context.xml");
>> >>>    }
>> >>> }
>> >>>
>> >>> which is using the following route:
>> >>> public class WireTapRouteBuilder extends RouteBuilder {
>> >>>
>> >>>    @Override
>> >>>    public void configure() throws Exception {
>> >>>        from("direct:start")
>> >>>
>>  .wireTap("seda:wireTap").executorServiceRef("executorService")
>> >>>            .setHeader("TYPE", constant("RESPONSE"))
>> >>>
>> >>> .wireTap("seda:wireTap").executorServiceRef("executorService");
>> >>>
>> >>>        from("seda:wireTap")
>> >>>            .choice()
>> >>>                .when(header("TYPE").isNull())
>> >>>                    .setBody(constant("REQUEST"))
>> >>>                    .to("mock:result")
>> >>>                .otherwise()
>> >>>                    .setBody(constant("RESPONSE"))
>> >>>                    .to("mock:result")
>> >>>            .end();
>> >>>    }
>> >>> }
>> >>>
>> >>> and the following configuration:
>> >>> 
> <beans xmlns="http://www.springframework.org/schema/beans"
>>
>  >>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>> >>>    xmlns:camel="http://camel.apache.org/schema/spring"
>> >>>    xsi:schemaLocation="
>> >>>        http://www.springframework.org/schema/beans
>> >>> http://www.springframework.org/schema/beans/spring-beans.xsd
>> >>>        http://camel.apache.org/schema/spring
>> >>> http://camel.apache.org/schema/spring/camel-spring.xsd">
>> >>>
>> >>> 
> <camel:camelContext id="com.xxx.xxxx.wireTapTest">
>> >>>
>> >>> 
> <camel:routeBuilder ref="wireTapRouteBuilder" />
>> >>>
>> >>> 
> <camel:threadPoolProfile id="executorService" defaultProfile="false"
>>
>  >>> poolSize="1" maxPoolSize="1" maxQueueSize="10000"
>> >>> rejectedPolicy="Discard"/>
>> >>>
>> >>> 
> </camel:camelContext>
>> >>>
>> >>> 
> <bean id="wireTapRouteBuilder"
>>
>  >>> class="com.xxx.xxxx.wiretap.WireTapRouteBuilder" />
>> >>> 
> </beans>
>> >>> We use the wiretap to send messages to an endpoint which persist some
>> key
>> >>> information about the execution in a database. It's important for us
>> to
>> >>> persist the messages into the database in the same order as they
>> arrive in
>> >>> the seda endpoint. Because of this, we configured the
>> threadPoolProfile to
>> >>> only use one thread. But some times the messages receive in the
>> reverse
>> >>> order in our database/mock endpoint.
>> >>>
>> >>> Any suggestions what is wrong here?
>> >>>
>> >>> Best,
>> >>> Christian
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> View this message in context:
>> http://camel.465427.n5.nabble.com/WireTap-doesn-t-process-the-messages-in-the-order-they-arrived-for-each-message-tp5731733p5731969.html
>> >> Sent from the Camel - Users mailing list archive at Nabble.com.
>> >
>> >
>> >
>> > --
>> > Claus Ibsen
>> > -----------------
>> > Red Hat, Inc.
>> > FuseSource is now part of Red Hat
>> > Email: 

> cibsen@

>> > Web: http://fusesource.com
>> > Twitter: davsclaus
>> > Blog: http://davsclaus.com
>> > Author of Camel in Action: http://www.manning.com/ibsen
>>





--
View this message in context: http://camel.465427.n5.nabble.com/WireTap-doesn-t-process-the-messages-in-the-order-they-arrived-for-each-message-tp5731733p5731986.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: WireTap doesn't process the messages in the order they arrived for each message

Posted by Christian Müller <ch...@gmail.com>.
Thank you guys for the response!

Best,
Christian


On Fri, May 3, 2013 at 3:38 PM, Babak Vahdat <ba...@swissonline.ch>wrote:

> Yeah that was the cause of the behaviour threadPoolProfile instead of
> threadProfile, sorry!
>
> Babak
>
> Am 03.05.2013 um 15:23 schrieb Claus Ibsen <cl...@gmail.com>:
>
> > Babak well spotted. Though its not a bug.
> >
> > <threadPoolProfile> is a profile, (aka like a skeleton/template).
> > Which you use to define options, when creating new thread pools.
> >
> > Use
> > <threadPool>
> >
> > if you want a single thread pool and share it between the 2 wire taps,
> > then yeah Christian should use a <threadPool> instead.
> >
> > Mind that you can still define a <threadPoolProfile> and then a
> > <threadPool> which can use the profile to define its basic options.
> >
> > See more details at
> > http://camel.apache.org/threading-model.html
> >
> >
> >
> > On Fri, May 3, 2013 at 2:52 PM, Babak Vahdat
> > <ba...@swissonline.ch> wrote:
> >> Hi,
> >>
> >> The reason of the behavior you're observing is simply because of a
> current
> >> bug in Camel itself as given your 2 routes below, the current logic
> creates
> >> TWO instances of ExecutorService and not as you would expect ONE
> >> ExecutorService object (each being passed to each of the 2
> WireTapProcessor
> >> in your route). Looking at
> >> ProcessorDefinitionHelper#lookupExecutorServiceRef() the current logic
> looks
> >> up ONLY inside the registry causing 2 ExecutorService being returned by
> each
> >> invocation of this method.
> >>
> >> One simple current workaround to get that lovely green lines inside
> your IDE
> >> for your test would be to bind "executorService" into the Camel
> Registry, in
> >> your case ApplicationContextRegistry, that's:
> >>
> >>  <bean id="executorService" class="java.util.concurrent.Executors"
> >> factory-method="newSingleThreadExecutor" destroy-method="shutdownNow" />
> >>
> >> Would you mind to raise a ticket for this?
> >>
> >> Babak
> >>
> >>
> >> Christian Mueller wrote
> >>> The following test fails randomly fails at message 6 up to 961 (on my
> >>> machine). I consider this as an bug:
> >>>
> >>> public class WireTapTest extends CamelSpringTestSupport {
> >>>
> >>>    private int counter = 10000;
> >>>
> >>>    @Test
> >>>    public void test() throws InterruptedException {
> >>>        getMockEndpoint("mock:result").expectedMessageCount(counter *
> 2);
> >>>
> >>>        template.setDefaultEndpointUri("direct:start");
> >>>        for (int i = 0; i < counter; i++) {
> >>>            template.requestBody("Camel");
> >>>        }
> >>>
> >>>        assertMockEndpointsSatisfied();
> >>>
> >>>        List
> >>> <Exchange>
> >>> receivedExchanges =
> >>> getMockEndpoint("mock:result").getReceivedExchanges();
> >>>        for (int i = 0; i < receivedExchanges.size(); i++) {
> >>>            System.out.println("check exchange number " + i);
> >>>
> >>>            String body =
> >>> receivedExchanges.get(i).getIn().getBody(String.class);
> >>>            if (i % 2 == 0) {
> >>>                assertEquals("REQUEST", body);
> >>>            } else {
> >>>                assertEquals("RESPONSE", body);
> >>>            }
> >>>        }
> >>>    }
> >>>
> >>>    @Override
> >>>    protected AbstractApplicationContext createApplicationContext() {
> >>>        return new ClassPathXmlApplicationContext("bundle-context.xml");
> >>>    }
> >>> }
> >>>
> >>> which is using the following route:
> >>> public class WireTapRouteBuilder extends RouteBuilder {
> >>>
> >>>    @Override
> >>>    public void configure() throws Exception {
> >>>        from("direct:start")
> >>>
>  .wireTap("seda:wireTap").executorServiceRef("executorService")
> >>>            .setHeader("TYPE", constant("RESPONSE"))
> >>>
> >>> .wireTap("seda:wireTap").executorServiceRef("executorService");
> >>>
> >>>        from("seda:wireTap")
> >>>            .choice()
> >>>                .when(header("TYPE").isNull())
> >>>                    .setBody(constant("REQUEST"))
> >>>                    .to("mock:result")
> >>>                .otherwise()
> >>>                    .setBody(constant("RESPONSE"))
> >>>                    .to("mock:result")
> >>>            .end();
> >>>    }
> >>> }
> >>>
> >>> and the following configuration:
> >>> <beans xmlns="http://www.springframework.org/schema/beans"
> >>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> >>>    xmlns:camel="http://camel.apache.org/schema/spring"
> >>>    xsi:schemaLocation="
> >>>        http://www.springframework.org/schema/beans
> >>> http://www.springframework.org/schema/beans/spring-beans.xsd
> >>>        http://camel.apache.org/schema/spring
> >>> http://camel.apache.org/schema/spring/camel-spring.xsd">
> >>>
> >>> <camel:camelContext id="com.xxx.xxxx.wireTapTest">
> >>>
> >>> <camel:routeBuilder ref="wireTapRouteBuilder" />
> >>>
> >>> <camel:threadPoolProfile id="executorService" defaultProfile="false"
> >>> poolSize="1" maxPoolSize="1" maxQueueSize="10000"
> >>> rejectedPolicy="Discard"/>
> >>>
> >>> </camel:camelContext>
> >>>
> >>> <bean id="wireTapRouteBuilder"
> >>> class="com.xxx.xxxx.wiretap.WireTapRouteBuilder" />
> >>> </beans>
> >>> We use the wiretap to send messages to an endpoint which persist some
> key
> >>> information about the execution in a database. It's important for us to
> >>> persist the messages into the database in the same order as they
> arrive in
> >>> the seda endpoint. Because of this, we configured the
> threadPoolProfile to
> >>> only use one thread. But some times the messages receive in the reverse
> >>> order in our database/mock endpoint.
> >>>
> >>> Any suggestions what is wrong here?
> >>>
> >>> Best,
> >>> Christian
> >>
> >>
> >>
> >>
> >>
> >> --
> >> View this message in context:
> http://camel.465427.n5.nabble.com/WireTap-doesn-t-process-the-messages-in-the-order-they-arrived-for-each-message-tp5731733p5731969.html
> >> Sent from the Camel - Users mailing list archive at Nabble.com.
> >
> >
> >
> > --
> > Claus Ibsen
> > -----------------
> > Red Hat, Inc.
> > FuseSource is now part of Red Hat
> > Email: cibsen@redhat.com
> > Web: http://fusesource.com
> > Twitter: davsclaus
> > Blog: http://davsclaus.com
> > Author of Camel in Action: http://www.manning.com/ibsen
>

Re: WireTap doesn't process the messages in the order they arrived for each message

Posted by Babak Vahdat <ba...@swissonline.ch>.
Yeah that was the cause of the behaviour threadPoolProfile instead of threadProfile, sorry!

Babak

Am 03.05.2013 um 15:23 schrieb Claus Ibsen <cl...@gmail.com>:

> Babak well spotted. Though its not a bug.
> 
> <threadPoolProfile> is a profile, (aka like a skeleton/template).
> Which you use to define options, when creating new thread pools.
> 
> Use
> <threadPool>
> 
> if you want a single thread pool and share it between the 2 wire taps,
> then yeah Christian should use a <threadPool> instead.
> 
> Mind that you can still define a <threadPoolProfile> and then a
> <threadPool> which can use the profile to define its basic options.
> 
> See more details at
> http://camel.apache.org/threading-model.html
> 
> 
> 
> On Fri, May 3, 2013 at 2:52 PM, Babak Vahdat
> <ba...@swissonline.ch> wrote:
>> Hi,
>> 
>> The reason of the behavior you're observing is simply because of a current
>> bug in Camel itself as given your 2 routes below, the current logic creates
>> TWO instances of ExecutorService and not as you would expect ONE
>> ExecutorService object (each being passed to each of the 2 WireTapProcessor
>> in your route). Looking at
>> ProcessorDefinitionHelper#lookupExecutorServiceRef() the current logic looks
>> up ONLY inside the registry causing 2 ExecutorService being returned by each
>> invocation of this method.
>> 
>> One simple current workaround to get that lovely green lines inside your IDE
>> for your test would be to bind "executorService" into the Camel Registry, in
>> your case ApplicationContextRegistry, that's:
>> 
>>  <bean id="executorService" class="java.util.concurrent.Executors"
>> factory-method="newSingleThreadExecutor" destroy-method="shutdownNow" />
>> 
>> Would you mind to raise a ticket for this?
>> 
>> Babak
>> 
>> 
>> Christian Mueller wrote
>>> The following test fails randomly fails at message 6 up to 961 (on my
>>> machine). I consider this as an bug:
>>> 
>>> public class WireTapTest extends CamelSpringTestSupport {
>>> 
>>>    private int counter = 10000;
>>> 
>>>    @Test
>>>    public void test() throws InterruptedException {
>>>        getMockEndpoint("mock:result").expectedMessageCount(counter * 2);
>>> 
>>>        template.setDefaultEndpointUri("direct:start");
>>>        for (int i = 0; i < counter; i++) {
>>>            template.requestBody("Camel");
>>>        }
>>> 
>>>        assertMockEndpointsSatisfied();
>>> 
>>>        List
>>> <Exchange>
>>> receivedExchanges =
>>> getMockEndpoint("mock:result").getReceivedExchanges();
>>>        for (int i = 0; i < receivedExchanges.size(); i++) {
>>>            System.out.println("check exchange number " + i);
>>> 
>>>            String body =
>>> receivedExchanges.get(i).getIn().getBody(String.class);
>>>            if (i % 2 == 0) {
>>>                assertEquals("REQUEST", body);
>>>            } else {
>>>                assertEquals("RESPONSE", body);
>>>            }
>>>        }
>>>    }
>>> 
>>>    @Override
>>>    protected AbstractApplicationContext createApplicationContext() {
>>>        return new ClassPathXmlApplicationContext("bundle-context.xml");
>>>    }
>>> }
>>> 
>>> which is using the following route:
>>> public class WireTapRouteBuilder extends RouteBuilder {
>>> 
>>>    @Override
>>>    public void configure() throws Exception {
>>>        from("direct:start")
>>>            .wireTap("seda:wireTap").executorServiceRef("executorService")
>>>            .setHeader("TYPE", constant("RESPONSE"))
>>> 
>>> .wireTap("seda:wireTap").executorServiceRef("executorService");
>>> 
>>>        from("seda:wireTap")
>>>            .choice()
>>>                .when(header("TYPE").isNull())
>>>                    .setBody(constant("REQUEST"))
>>>                    .to("mock:result")
>>>                .otherwise()
>>>                    .setBody(constant("RESPONSE"))
>>>                    .to("mock:result")
>>>            .end();
>>>    }
>>> }
>>> 
>>> and the following configuration:
>>> <beans xmlns="http://www.springframework.org/schema/beans"
>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>>    xmlns:camel="http://camel.apache.org/schema/spring"
>>>    xsi:schemaLocation="
>>>        http://www.springframework.org/schema/beans
>>> http://www.springframework.org/schema/beans/spring-beans.xsd
>>>        http://camel.apache.org/schema/spring
>>> http://camel.apache.org/schema/spring/camel-spring.xsd">
>>> 
>>> <camel:camelContext id="com.xxx.xxxx.wireTapTest">
>>> 
>>> <camel:routeBuilder ref="wireTapRouteBuilder" />
>>> 
>>> <camel:threadPoolProfile id="executorService" defaultProfile="false"
>>> poolSize="1" maxPoolSize="1" maxQueueSize="10000"
>>> rejectedPolicy="Discard"/>
>>> 
>>> </camel:camelContext>
>>> 
>>> <bean id="wireTapRouteBuilder"
>>> class="com.xxx.xxxx.wiretap.WireTapRouteBuilder" />
>>> </beans>
>>> We use the wiretap to send messages to an endpoint which persist some key
>>> information about the execution in a database. It's important for us to
>>> persist the messages into the database in the same order as they arrive in
>>> the seda endpoint. Because of this, we configured the threadPoolProfile to
>>> only use one thread. But some times the messages receive in the reverse
>>> order in our database/mock endpoint.
>>> 
>>> Any suggestions what is wrong here?
>>> 
>>> Best,
>>> Christian
>> 
>> 
>> 
>> 
>> 
>> --
>> View this message in context: http://camel.465427.n5.nabble.com/WireTap-doesn-t-process-the-messages-in-the-order-they-arrived-for-each-message-tp5731733p5731969.html
>> Sent from the Camel - Users mailing list archive at Nabble.com.
> 
> 
> 
> -- 
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> FuseSource is now part of Red Hat
> Email: cibsen@redhat.com
> Web: http://fusesource.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen

Re: WireTap doesn't process the messages in the order they arrived for each message

Posted by Claus Ibsen <cl...@gmail.com>.
Babak well spotted. Though its not a bug.

<threadPoolProfile> is a profile, (aka like a skeleton/template).
Which you use to define options, when creating new thread pools.

Use
<threadPool>

if you want a single thread pool and share it between the 2 wire taps,
then yeah Christian should use a <threadPool> instead.

Mind that you can still define a <threadPoolProfile> and then a
<threadPool> which can use the profile to define its basic options.

See more details at
http://camel.apache.org/threading-model.html



On Fri, May 3, 2013 at 2:52 PM, Babak Vahdat
<ba...@swissonline.ch> wrote:
> Hi,
>
> The reason of the behavior you're observing is simply because of a current
> bug in Camel itself as given your 2 routes below, the current logic creates
> TWO instances of ExecutorService and not as you would expect ONE
> ExecutorService object (each being passed to each of the 2 WireTapProcessor
> in your route). Looking at
> ProcessorDefinitionHelper#lookupExecutorServiceRef() the current logic looks
> up ONLY inside the registry causing 2 ExecutorService being returned by each
> invocation of this method.
>
> One simple current workaround to get that lovely green lines inside your IDE
> for your test would be to bind "executorService" into the Camel Registry, in
> your case ApplicationContextRegistry, that's:
>
>   <bean id="executorService" class="java.util.concurrent.Executors"
> factory-method="newSingleThreadExecutor" destroy-method="shutdownNow" />
>
> Would you mind to raise a ticket for this?
>
> Babak
>
>
> Christian Mueller wrote
>> The following test fails randomly fails at message 6 up to 961 (on my
>> machine). I consider this as an bug:
>>
>> public class WireTapTest extends CamelSpringTestSupport {
>>
>>     private int counter = 10000;
>>
>>     @Test
>>     public void test() throws InterruptedException {
>>         getMockEndpoint("mock:result").expectedMessageCount(counter * 2);
>>
>>         template.setDefaultEndpointUri("direct:start");
>>         for (int i = 0; i < counter; i++) {
>>             template.requestBody("Camel");
>>         }
>>
>>         assertMockEndpointsSatisfied();
>>
>>         List
>> <Exchange>
>>  receivedExchanges =
>> getMockEndpoint("mock:result").getReceivedExchanges();
>>         for (int i = 0; i < receivedExchanges.size(); i++) {
>>             System.out.println("check exchange number " + i);
>>
>>             String body =
>> receivedExchanges.get(i).getIn().getBody(String.class);
>>             if (i % 2 == 0) {
>>                 assertEquals("REQUEST", body);
>>             } else {
>>                 assertEquals("RESPONSE", body);
>>             }
>>         }
>>     }
>>
>>     @Override
>>     protected AbstractApplicationContext createApplicationContext() {
>>         return new ClassPathXmlApplicationContext("bundle-context.xml");
>>     }
>> }
>>
>> which is using the following route:
>> public class WireTapRouteBuilder extends RouteBuilder {
>>
>>     @Override
>>     public void configure() throws Exception {
>>         from("direct:start")
>>             .wireTap("seda:wireTap").executorServiceRef("executorService")
>>             .setHeader("TYPE", constant("RESPONSE"))
>>
>> .wireTap("seda:wireTap").executorServiceRef("executorService");
>>
>>         from("seda:wireTap")
>>             .choice()
>>                 .when(header("TYPE").isNull())
>>                     .setBody(constant("REQUEST"))
>>                     .to("mock:result")
>>                 .otherwise()
>>                     .setBody(constant("RESPONSE"))
>>                     .to("mock:result")
>>             .end();
>>     }
>> }
>>
>> and the following configuration:
>> <beans xmlns="http://www.springframework.org/schema/beans"
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>     xmlns:camel="http://camel.apache.org/schema/spring"
>>     xsi:schemaLocation="
>>         http://www.springframework.org/schema/beans
>> http://www.springframework.org/schema/beans/spring-beans.xsd
>>         http://camel.apache.org/schema/spring
>> http://camel.apache.org/schema/spring/camel-spring.xsd">
>>
>> <camel:camelContext id="com.xxx.xxxx.wireTapTest">
>>
>> <camel:routeBuilder ref="wireTapRouteBuilder" />
>>
>> <camel:threadPoolProfile id="executorService" defaultProfile="false"
>> poolSize="1" maxPoolSize="1" maxQueueSize="10000"
>> rejectedPolicy="Discard"/>
>>
>> </camel:camelContext>
>>
>> <bean id="wireTapRouteBuilder"
>> class="com.xxx.xxxx.wiretap.WireTapRouteBuilder" />
>> </beans>
>> We use the wiretap to send messages to an endpoint which persist some key
>> information about the execution in a database. It's important for us to
>> persist the messages into the database in the same order as they arrive in
>> the seda endpoint. Because of this, we configured the threadPoolProfile to
>> only use one thread. But some times the messages receive in the reverse
>> order in our database/mock endpoint.
>>
>> Any suggestions what is wrong here?
>>
>> Best,
>> Christian
>
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/WireTap-doesn-t-process-the-messages-in-the-order-they-arrived-for-each-message-tp5731733p5731969.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
FuseSource is now part of Red Hat
Email: cibsen@redhat.com
Web: http://fusesource.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen