You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Erwin Etchart <er...@gmail.com> on 2013/09/24 20:33:58 UTC

Fwd: Multicast and aggregate question

Hi everybody.

I have a question about this example.

    <camelContext id="CamelContext" xmlns="
http://camel.apache.org/schema/spring" autoStartup="true" trace="true">
        <propertyPlaceholder id="properties"
location="ref:spring-properties" />


            <route>
                <from uri="jms:INCOMING.MAIN.QUEUE" />
                <choice>
                    <when>
                        <simple>${in.header.origin} == 'XXX1'</simple>
                            <aggregate   strategyRef="aggregator.strategy"

aggregationRepositoryRef="aggregator.container" completionSize="2"   >
                                <correlationExpression>
                                    <header>aggregationKey</header>
                                </correlationExpression>
                            <to uri="direct:to.multicast" />
                            </aggregate>

                    </when>
                    <otherwise>
                        <log message="UNRECOGNIZED MESSAGE SOURCE" />
                        <to uri="jms:INCOMING.MAIN.UNRECOGNIZED.QUEUE" />
                        <stop />
                    </otherwise>
                </choice>
        </route>

        <route>
            <from uri="direct:to.multicast" />
            <multicast stopOnException="false" >
                <to uri="direct:1.IN.SPRING.REMOTING" />
                <to uri="direct:2.IN.SPRING.REMOTING" />
                <to uri="direct:3.IN.SPRING.REMOTING" />
            </multicast>

        </route>



        <route autoStartup="true"  >
            <from uri="direct:1.IN.SPRING.REMOTING" />
            <transform>
                <method bean="specific.converter" method="toTestRequest"/>
            </transform>
            <to
uri="bean:spring.remote?method=springRemotingTest(${body})"  />
            <onCompletion onCompleteOnly="true" >
                <process ref="post.processor" />
                <to uri="jms:COMPLETED" />
            </onCompletion>

        </route>

        <route autoStartup="true"  >
            <from uri="direct:2.IN.SPRING.REMOTING" />
            <transform>
                <method bean="specific.converter" method="toTestRequest"/>
            </transform>
            <to
uri="bean:spring.remote?method=springRemotingTest2(${body})" />
            <onCompletion onCompleteOnly="true" >
                <process ref="post.processor" />
                <to uri="jms:COMPLETED" />
            </onCompletion>

        </route>


        <route autoStartup="true"  >
            <from uri="direct:3.IN.SPRING.REMOTING" />
            <transform>
                <method bean="specific.converter" method="toTestRequest"/>
            </transform>
            <to
uri="bean:spring.remote?method=springRemotingTest3(${body})" />
            <onCompletion onCompleteOnly="true" >
                <process ref="post.processor" />
                <to uri="jms:COMPLETED" />
            </onCompletion>
        </route>

    </camelContext>


The result is two queues:

INCOMING.MAIN,QUEUE with two messsages arrived and 2 messages dequeued
COMPLETED with 9 messages

But i don´t understand why 9 messages are arriving... i'm sending two
messages with the same aggregation Key and after the second one the
aggregation does the release.

The agregation strategy just returns the new exchange;

Regards

Erwin

Re: Multicast and aggregate question

Posted by Claus Ibsen <cl...@gmail.com>.
You use the multicast which copies the messages to all the 3. Read the
Camel documentation about the eip to understand more.

On Tue, Sep 24, 2013 at 7:33 PM, Erwin Etchart <er...@gmail.com> wrote:
> Hi everybody.
>
> I have a question about this example.
>
>     <camelContext id="CamelContext" xmlns="
> http://camel.apache.org/schema/spring" autoStartup="true" trace="true">
>         <propertyPlaceholder id="properties"
> location="ref:spring-properties" />
>
>
>             <route>
>                 <from uri="jms:INCOMING.MAIN.QUEUE" />
>                 <choice>
>                     <when>
>                         <simple>${in.header.origin} == 'XXX1'</simple>
>                             <aggregate   strategyRef="aggregator.strategy"
>
> aggregationRepositoryRef="aggregator.container" completionSize="2"   >
>                                 <correlationExpression>
>                                     <header>aggregationKey</header>
>                                 </correlationExpression>
>                             <to uri="direct:to.multicast" />
>                             </aggregate>
>
>                     </when>
>                     <otherwise>
>                         <log message="UNRECOGNIZED MESSAGE SOURCE" />
>                         <to uri="jms:INCOMING.MAIN.UNRECOGNIZED.QUEUE" />
>                         <stop />
>                     </otherwise>
>                 </choice>
>         </route>
>
>         <route>
>             <from uri="direct:to.multicast" />
>             <multicast stopOnException="false" >
>                 <to uri="direct:1.IN.SPRING.REMOTING" />
>                 <to uri="direct:2.IN.SPRING.REMOTING" />
>                 <to uri="direct:3.IN.SPRING.REMOTING" />
>             </multicast>
>
>         </route>
>
>
>
>         <route autoStartup="true"  >
>             <from uri="direct:1.IN.SPRING.REMOTING" />
>             <transform>
>                 <method bean="specific.converter" method="toTestRequest"/>
>             </transform>
>             <to
> uri="bean:spring.remote?method=springRemotingTest(${body})"  />
>             <onCompletion onCompleteOnly="true" >
>                 <process ref="post.processor" />
>                 <to uri="jms:COMPLETED" />
>             </onCompletion>
>
>         </route>
>
>         <route autoStartup="true"  >
>             <from uri="direct:2.IN.SPRING.REMOTING" />
>             <transform>
>                 <method bean="specific.converter" method="toTestRequest"/>
>             </transform>
>             <to
> uri="bean:spring.remote?method=springRemotingTest2(${body})" />
>             <onCompletion onCompleteOnly="true" >
>                 <process ref="post.processor" />
>                 <to uri="jms:COMPLETED" />
>             </onCompletion>
>
>         </route>
>
>
>         <route autoStartup="true"  >
>             <from uri="direct:3.IN.SPRING.REMOTING" />
>             <transform>
>                 <method bean="specific.converter" method="toTestRequest"/>
>             </transform>
>             <to
> uri="bean:spring.remote?method=springRemotingTest3(${body})" />
>             <onCompletion onCompleteOnly="true" >
>                 <process ref="post.processor" />
>                 <to uri="jms:COMPLETED" />
>             </onCompletion>
>         </route>
>
>     </camelContext>
>
>
> The result is two queues:
>
> INCOMING.MAIN,QUEUE with two messsages arrived and 2 messages dequeued
> COMPLETED with 9 messages
>
> But i don´t understand why 9 messages are arriving... i'm sending two
> messages with the same aggregation Key and after the second one the
> aggregation does the release.
>
> The agregation strategy just returns the new exchange;
>
> Regards
>
> Erwin



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: Multicast and aggregate question

Posted by Erwin Etchart <er...@gmail.com>.
I didn't know it but every aggregate needs its own repository.

Best Regards.

Erwin


2013/9/24 Erwin Etchart <er...@gmail.com>

> Hi everybody.
>
> I have a question about this example.
>
>     <camelContext id="CamelContext" xmlns="
> http://camel.apache.org/schema/spring" autoStartup="true" trace="true">
>         <propertyPlaceholder id="properties"
> location="ref:spring-properties" />
>
>
>             <route>
>                 <from uri="jms:INCOMING.MAIN.QUEUE" />
>                 <choice>
>                     <when>
>                         <simple>${in.header.origin} == 'XXX1'</simple>
>                             <aggregate
> strategyRef="aggregator.strategy"
>
> aggregationRepositoryRef="aggregator.container" completionSize="2"   >
>                                 <correlationExpression>
>                                     <header>aggregationKey</header>
>                                 </correlationExpression>
>                             <to uri="direct:to.multicast" />
>                             </aggregate>
>
>                     </when>
>                     <otherwise>
>                         <log message="UNRECOGNIZED MESSAGE SOURCE" />
>                         <to uri="jms:INCOMING.MAIN.UNRECOGNIZED.QUEUE" />
>                         <stop />
>                     </otherwise>
>                 </choice>
>         </route>
>
>         <route>
>             <from uri="direct:to.multicast" />
>             <multicast stopOnException="false" >
>                 <to uri="direct:1.IN.SPRING.REMOTING" />
>                 <to uri="direct:2.IN.SPRING.REMOTING" />
>                 <to uri="direct:3.IN.SPRING.REMOTING" />
>             </multicast>
>
>         </route>
>
>
>
>         <route autoStartup="true"  >
>             <from uri="direct:1.IN.SPRING.REMOTING" />
>             <transform>
>                 <method bean="specific.converter" method="toTestRequest"/>
>             </transform>
>             <to
> uri="bean:spring.remote?method=springRemotingTest(${body})"  />
>             <onCompletion onCompleteOnly="true" >
>                 <process ref="post.processor" />
>                 <to uri="jms:COMPLETED" />
>             </onCompletion>
>
>         </route>
>
>         <route autoStartup="true"  >
>             <from uri="direct:2.IN.SPRING.REMOTING" />
>             <transform>
>                 <method bean="specific.converter" method="toTestRequest"/>
>             </transform>
>             <to
> uri="bean:spring.remote?method=springRemotingTest2(${body})" />
>             <onCompletion onCompleteOnly="true" >
>                 <process ref="post.processor" />
>                 <to uri="jms:COMPLETED" />
>             </onCompletion>
>
>         </route>
>
>
>         <route autoStartup="true"  >
>             <from uri="direct:3.IN.SPRING.REMOTING" />
>             <transform>
>                 <method bean="specific.converter" method="toTestRequest"/>
>             </transform>
>             <to
> uri="bean:spring.remote?method=springRemotingTest3(${body})" />
>             <onCompletion onCompleteOnly="true" >
>                 <process ref="post.processor" />
>                 <to uri="jms:COMPLETED" />
>             </onCompletion>
>         </route>
>
>     </camelContext>
>
>
> The result is two queues:
>
> INCOMING.MAIN,QUEUE with two messsages arrived and 2 messages dequeued
> COMPLETED with 9 messages
>
> But i don´t understand why 9 messages are arriving... i'm sending two
> messages with the same aggregation Key and after the second one the
> aggregation does the release.
>
> The agregation strategy just returns the new exchange;
>
> Regards
>
> Erwin
>
>
>