You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Th...@telekom.de on 2019/12/12 13:56:40 UTC

Aggregation and unitOfWork

Hi, 

Aggration seems to lost the connection to original Message.

I have the following construct:

from(FILE_IN_URI)
      .split(new ZipSplitter()).shareUnitOfWork()
          .streaming()
          .choice()
               .when(simple("${in.header.CamelFileName} ends with 'xml'"))
                   .process(exchange -> {
                          System.out.println("debug exchange"); -> throw EXCEPTION 1
                    })
                   .to(EP_AGGREGATION)
               .endChoice()
              .otherwise()
                  .to(FILE_OUT_URI)
                   .process(exchange -> {
                          System.out.println("debug exchange");    -> throw EXCEPTION 2
                    })
                  .to(EP_AGGREGATION)
              .endChoice()
          .end()
       .end()
;

from(EP_AGGREGATION_)        
       .aggregate(header("foo"), new ThomasAggregationStrategy())
        .completionTimeout(10 * 1000)
        .completion(header("bar").isEqualTo(true))
        .process(exchange -> {
             System.out.println("debug exchange");
        })
        .throwException(IllegalArgumentException.class, "Thomas DEBUG") -> throw EXCEPTION 3
;

A route with splitter and an aggregation route. 
Before the last "to" at the end I add a process with println to set a breakpoint and debug the exchange. 

What happens is, when an exception is thrown in the splitter route, e.g. instead of the debug println. 
(EXCEPTION 1 and 2) the deadletter route works. 
But when an exception is thrown after aggregation the infos are gone. 
I noticed that the exchange at the end, before .to(EP_AGGREGATION) is different from the exchange the 
ThomasAggregationStrategy:: aggregate() sees. I see unitOfWork is gone. 

errorHandler(deadLetterChannel("direct:deadletter").useOriginalMessage());
    from("direct:deadletter")
        // save original message




AW: Aggregation and unitOfWork

Posted by Th...@telekom.de.
Hi, 

I tried to. But the messages I get in aggregate(old, new) or different and don’t have the information anymore.
And the aggregate-method with three params is never called. 

@Override
  public Exchange aggregate(Exchange oldExchange, Exchange newExchange, Exchange inputExchange) {
    return aggregate(oldExchange, newExchange);
  }

Or I put the original message to message header?

Thomas


-----Ursprüngliche Nachricht-----
Von: Claus Ibsen <cl...@gmail.com> 
Gesendet: Montag, 16. Dezember 2019 06:35
An: users@camel.apache.org
Betreff: Re: Aggregation and unitOfWork

Hi

The output of the aggregate is not tied to any of its input, its a separate new exchange, so they dont' share any unit of work etc.
So the data you want as output from the aggregator, you must then add via the aggregation strategy

On Thu, Dec 12, 2019 at 2:56 PM <Th...@telekom.de> wrote:
>
> Hi,
>
> Aggration seems to lost the connection to original Message.
>
> I have the following construct:
>
> from(FILE_IN_URI)
>       .split(new ZipSplitter()).shareUnitOfWork()
>           .streaming()
>           .choice()
>                .when(simple("${in.header.CamelFileName} ends with 'xml'"))
>                    .process(exchange -> {
>                           System.out.println("debug exchange"); -> throw EXCEPTION 1
>                     })
>                    .to(EP_AGGREGATION)
>                .endChoice()
>               .otherwise()
>                   .to(FILE_OUT_URI)
>                    .process(exchange -> {
>                           System.out.println("debug exchange");    -> throw EXCEPTION 2
>                     })
>                   .to(EP_AGGREGATION)
>               .endChoice()
>           .end()
>        .end()
> ;
>
> from(EP_AGGREGATION_)
>        .aggregate(header("foo"), new ThomasAggregationStrategy())
>         .completionTimeout(10 * 1000)
>         .completion(header("bar").isEqualTo(true))
>         .process(exchange -> {
>              System.out.println("debug exchange");
>         })
>         .throwException(IllegalArgumentException.class, "Thomas 
> DEBUG") -> throw EXCEPTION 3 ;
>
> A route with splitter and an aggregation route.
> Before the last "to" at the end I add a process with println to set a breakpoint and debug the exchange.
>
> What happens is, when an exception is thrown in the splitter route, e.g. instead of the debug println.
> (EXCEPTION 1 and 2) the deadletter route works.
> But when an exception is thrown after aggregation the infos are gone.
> I noticed that the exchange at the end, before .to(EP_AGGREGATION) is 
> different from the exchange the
> ThomasAggregationStrategy:: aggregate() sees. I see unitOfWork is gone.
>
> errorHandler(deadLetterChannel("direct:deadletter").useOriginalMessage());
>     from("direct:deadletter")
>         // save original message
>
>
>


--
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Re: Aggregation and unitOfWork

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

The output of the aggregate is not tied to any of its input, its a
separate new exchange, so they dont' share any unit of work etc.
So the data you want as output from the aggregator, you must then add
via the aggregation strategy

On Thu, Dec 12, 2019 at 2:56 PM <Th...@telekom.de> wrote:
>
> Hi,
>
> Aggration seems to lost the connection to original Message.
>
> I have the following construct:
>
> from(FILE_IN_URI)
>       .split(new ZipSplitter()).shareUnitOfWork()
>           .streaming()
>           .choice()
>                .when(simple("${in.header.CamelFileName} ends with 'xml'"))
>                    .process(exchange -> {
>                           System.out.println("debug exchange"); -> throw EXCEPTION 1
>                     })
>                    .to(EP_AGGREGATION)
>                .endChoice()
>               .otherwise()
>                   .to(FILE_OUT_URI)
>                    .process(exchange -> {
>                           System.out.println("debug exchange");    -> throw EXCEPTION 2
>                     })
>                   .to(EP_AGGREGATION)
>               .endChoice()
>           .end()
>        .end()
> ;
>
> from(EP_AGGREGATION_)
>        .aggregate(header("foo"), new ThomasAggregationStrategy())
>         .completionTimeout(10 * 1000)
>         .completion(header("bar").isEqualTo(true))
>         .process(exchange -> {
>              System.out.println("debug exchange");
>         })
>         .throwException(IllegalArgumentException.class, "Thomas DEBUG") -> throw EXCEPTION 3
> ;
>
> A route with splitter and an aggregation route.
> Before the last "to" at the end I add a process with println to set a breakpoint and debug the exchange.
>
> What happens is, when an exception is thrown in the splitter route, e.g. instead of the debug println.
> (EXCEPTION 1 and 2) the deadletter route works.
> But when an exception is thrown after aggregation the infos are gone.
> I noticed that the exchange at the end, before .to(EP_AGGREGATION) is different from the exchange the
> ThomasAggregationStrategy:: aggregate() sees. I see unitOfWork is gone.
>
> errorHandler(deadLetterChannel("direct:deadletter").useOriginalMessage());
>     from("direct:deadletter")
>         // save original message
>
>
>


-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2