You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by nigel <ni...@pimco.com> on 2013/10/15 23:12:56 UTC

How to do aggregation of multiple InOut exchanges from different producers

Hi,
I'm trying to work out how to use Camel to do the following:
For efficiency reasons I want my main Processor to work on all messages
received in the last 10 seconds. Messages will be send over HTTP (or JMS
going forward)from multiple endpoints using the InOut exchange pattern. I
want to collect them all together, present a list of the 'bodies' to a
processor and then have it return a list of responses. Each response then
needs to be matched to the relevant request and a reply generated to the
corresponding queue. 
I've tied using the route sequence:
    from("jetty:http://localhost1234/munge?matchOnUriPrefix=true")
    .aggregate(constant(true))
    .completionTimeout(5000L)
    .groupExchanges()
    .process(myClass)'
but my producers seem to get a response immediately to their input message.
The timeout then fires and my processor get the list but the results don't
go anywhere.

Before I either dig deeper or give up I wondered if a guru out there could
tell me if this is possible in Camel. Its very similar to examples but none
of the ones I've looked at have the InOut pattern.

Thanks
Nigel




--
View this message in context: http://camel.465427.n5.nabble.com/How-to-do-aggregation-of-multiple-InOut-exchanges-from-different-producers-tp5741621.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: How to do aggregation of multiple InOut exchanges from different producers

Posted by nigel <ni...@pimco.com>.
By experimentation I managed to get the JMS component to do what I needed. I
couldn't get the SJMS one to work as its a bit to set in its ways once it
starts. The tricks were to:
1. add disableReplyTo=true to stop the InOut exchange replying to early
2. Create a custom splitter to return the exchange list in the headers
3. For some reason I had to use an onPrepare to lift the exchange up one
level - not sure why
4. Use the lazy reply logic as defined on the JMS component docs page.
Code below:

from("jms:topic:TOPIC.NAME?disableReplyTo=true")
      .aggregate(constant(true))
      .completionTimeout(2000L)
      .groupExchanges()
      .process(new GroupProcessor())
      .split().method(MyRoutes.class)
      .onPrepare(new Processor() {
          @Override
          public void process(Exchange exchange) throws Exception {
             Exchange realExchange =
exchange.getIn().getBody(Exchange.class);
             exchange.getIn().getHeaders().clear();
            
exchange.getIn().getHeaders().putAll(realExchange.getIn().getHeaders());
             exchange.getIn().setBody(realExchange.getIn().getBody());
          }
      })
      .process(new Processor() {
           @Override
           public void process(Exchange exchange) throws Exception {
                Destination replyDestination =
exchange.getIn().getHeader("JMSReplyTo", Destination.class);
                exchange.getIn().setHeader("JMSDestination",
replyDestination);
                exchange.getIn().removeHeader("JMSReplyTo");
                JmsComponent component =
exchange.getContext().getComponent("jms", JmsComponent.class);
                JmsEndpoint endpoint =
JmsEndpoint.newInstance(replyDestination, component);
                ProducerTemplate producerTemplate =
exchange.getContext().createProducerTemplate();
                producerTemplate.sendBodyAndHeaders(endpoint,
exchange.getIn().getBody(), exchange.getIn().getHeaders());
            }
     });
public class MyRoutes {
    @Handler
    public static List<Exchange> splitMe(@Header(value =
Exchange.GROUPED_EXCHANGE) List<Exchange> body) {
        return body;
    }
}

Is there a simpler way? And would these tricks work with the jetty
component. I really like the SJMS component but it doesn't have the
flexibility.
Also in the above if there is an error then the messages are lost so I need
to add a persistent store in to the route as well.
Nigel




--
View this message in context: http://camel.465427.n5.nabble.com/How-to-do-aggregation-of-multiple-InOut-exchanges-from-different-producers-tp5741621p5741988.html
Sent from the Camel - Users mailing list archive at Nabble.com.