You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Claus Ibsen (JIRA)" <ji...@apache.org> on 2019/08/06 19:16:00 UTC

[jira] [Assigned] (CAMEL-13287) AggregationStrategy - Access original exchange in aggregate method

     [ https://issues.apache.org/jira/browse/CAMEL-13287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Claus Ibsen reassigned CAMEL-13287:
-----------------------------------

    Assignee: Claus Ibsen

>  AggregationStrategy - Access original exchange in aggregate method
> -------------------------------------------------------------------
>
>                 Key: CAMEL-13287
>                 URL: https://issues.apache.org/jira/browse/CAMEL-13287
>             Project: Camel
>          Issue Type: New Feature
>          Components: camel-core
>            Reporter: Balazs Szeti
>            Assignee: Claus Ibsen
>            Priority: Major
>             Fix For: 3.0.0
>
>
> For aggregation after multicast/splitter the original exchange should optionally be available in the aggregate method.
>  There are several use cases when we would like to go-on processing the original exchange after multicast, but we'd like to enrich it with the outcome of the called routes. For example:
> {code:java}
> rest()
> .get("orders/{orderId}")
> .route()
>   .to("direct:getOrderDetails") //get UserId, ItemId
>   .setBody(method(this,"createResponsePojo"))
>   .multicast(new MyAggregationStrategy())
>     .to("direct:getUserDetails")
>     .to("direct:getDeliveryAddress")
>     .to("direct:getItemDetails")
>   .end()
> ;
> {code}
> If any of the called routes fail, we still would like return a partial response in our service (this is a common requirement in case of microservices). The MyAggregationStrategy should simply enrich the ResponePojo object from the original exchange somehow with the new exchanges coming from the sub-routes. See this example:
> {code:java}
> public class MyAggregationStrategy implements AggregationStrategy {
> public Exchange aggregateWithOriginal(Exchange oldExchange, Exchange newExchange, Exchange originalExchange) {
>   Exchange exchange = oldExchange != null ? oldExchange : originalExchange;
>   ResponsePojo response = exchange.getMessage().getBody(ResponsePojo.class);
>   if (! newExchange.isFailed()) {
>     // ... Add newExchange body somehow to ResponsePojo object...
>   }
>   return exchange;
> }
> ...
> }
> {code}
> Currently only the exchanges from the "sub-routes" are available during aggregation, so the exchange after the aggregate will be one (the first) of those. This comes with multiple problems:
>  * Though the exchanges in the sub-routes are copies of the original exchange, sub routes make modifications: modify headers, modify properties, etc. Usually we don't want to see all these set by a sub-route on the final aggregated exchange. It's only noise. "Whatever happens in the sub-route, should stay in the sub-route." - We only want to see on the aggregated exchange what we "took" intentionally from the sub-route exchanges.
>  * If we use stopOnException(true) our life is simple because we usually don't have to worry about exceptions in aggregate, we will stop anyway. The aggregation logic can become complicated if we want to go on with processing in case of errors. The first time aggregate() is called the oldExchange is null, so we usually take the newExchange as the return value. If this exchange has an Exception, we need to "clean" it first, otherwise the error handler will kick in after aggregation. This is non-trivial.
> h3. Suggested approach
> Let's extend the AggregationStrategy interface with a new method that takes three exchanges. This should be called after Multicast EIP (Enrich EIP is simple, it only has two exchanges).
> With a default implementations we can keep the interface compatible:
> {code:java}
> public interface AggregationStrategy {
> /**
> * Aggregates an old, a new and the original exchange together to create a single combined exchange.
> *
> * @param oldExchange the oldest exchange, which is the returned value of the previous aggregation on null.
> * @param newExchange the newest exchange
> * @param originalExchange the original exchange before Multicast or Splitter EIP. Null in case of Enrich EIP.
> * @return a combined exchange, favor returning the oldExchange
> */
> default Exchange aggregateWithOriginal(Exchange oldExchange, Exchange newExchange, Exchange originalExchange) {
>   return aggregate(oldExchange, newExchange);
> };
> //Maybe we should have a default implementation here too so one can only implement aggregateWithOriginal()
> Exchange aggregate(Exchange oldExchange, Exchange newExchange); 
> ... 
> }
> {code}
> {code:java}
>  {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)