You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Giuliano Santandrea <gs...@imolinfo.it> on 2017/08/22 11:28:21 UTC

Fwd: problem in aggregator followed by split

Hello,
I've experienced a strange behavior in a camel route: if I removed a
logging instruction my Camel route broke.
After making some tests I realized that the problem was in the
behaviour of the aggregator and the splitter and I'd like you to clear
some doubts about the pipeline mechanism.
I tried it with both Camel 2.17.7 and 2.19.2.

For example if we expect 2 messages and we implement a custom
aggregation that returns an exchange with an OUT like this:

public class MyAggregationStrategyFaulty implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            return newExchange;
        }
        String oldBody = oldExchange.getIn().getBody(String.class);
        String currentBody = newExchange.getIn().getBody(String.class);
        List<String> newOut = new ArrayList<>();
        newOut.add(oldBody);
        newOut.add(currentBody);
        oldExchange.getOut().setBody(newOut);
        return oldExchange;
    }
}

and then we implement a route like the following:

public class ProvaAggregateFaulty extends CamelTestSupport {
    private final static Logger logger =
LoggerFactory.getLogger(ProvaAggregateFaulty.class);

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {

                from("direct:start")
                        .log("Receiving \"${body}\" with correlation
key \"${header.myId}\"")
                        .aggregate(header("myId"),
                                     new MyAggregationStrategyFaulty())
                                     .completionSize(2)
                            .log("END AGGREGATE SUBROUTE (in, out):
\"${in.body}\", \"${out.body}\"")
                            .log("END AGGREGATE SUBROUTE 2 (in, out):
\"${in.body}\", \"${out.body}\"")
                            .end()

                        .log("END ROUTE (in, out): \"${in.body}\",
\"${out.body}\"")
                        .to("mock:result");

            }
        };
    }

    @Test
    public void test() throws Exception {
        context.setTracing(true);

        template.sendBodyAndHeader("direct:start", "a", "myId", 1);
        template.sendBodyAndHeader("direct:start", "b", "myId", 1);

        Thread.sleep(1000 * 10);
        assertMockEndpointsSatisfied();
    }


}

we can see that the pipeline mechanism (OUT->IN) is not triggered when
processing the first node of the aggregator subroute (the log node is
seeing a message with both IN and OUT).
Note however that if we insert another  node after the logging, for
example another logging, the pipeline is triggered after the first log
:
2017-08-22 11:22:51,588 [main           ] INFO  route1
        - Receiving "a" with correlation key "1"
2017-08-22 11:22:51,591 [main           ] INFO  route1
        - END ROUTE (in, out): "a", ""
2017-08-22 11:22:51,591 [main           ] INFO  route1
        - Receiving "b" with correlation key "1"
2017-08-22 11:22:51,595 [main           ] INFO  route1
        - END AGGREGATE SUBROUTE (in, out): "a", "[a, b]"
2017-08-22 11:22:51,595 [main           ] INFO  route1
        - END AGGREGATE SUBROUTE 2 (in, out): "[a, b]", ""
2017-08-22 11:22:51,595 [main           ] INFO  route1
        - END ROUTE (in, out): "b", ""


In my specific case I had to work with a route that had a splitter
inside the aggregator, as in the following example:
                from("direct:start")
                        .log("Receiving \"${body}\" with correlation
key \"${header.myId}\"")
                        .aggregate(header("myId"),
                                new MyAggregationStrategyFaulty())
                                        .completionSize(2)
                                .split(body())
                                      .log("SPLIT SUBROUTE (in, out):
\"${in.body}\", \"${out.body}\"")
                                      .end()
                                .end()
                        .log("END ROUTE (in, out): \"${in.body}\",
\"${out.body}\"")
                        .to("mock:result");
            }
with the following results:
2017-08-22 11:41:54,526 [main           ] INFO  route1
        - Receiving "a" with correlation key "1"
2017-08-22 11:41:54,529 [main           ] INFO  route1
        - END ROUTE (in, out): "a", ""
2017-08-22 11:41:54,531 [main           ] INFO  route1
        - Receiving "b" with correlation key "1"
2017-08-22 11:41:54,540 [main           ] INFO  route1
        - SPLIT SUBROUTE (in, out): "a", "[a, b]"
2017-08-22 11:41:54,540 [main           ] INFO  route1
        - END ROUTE (in, out): "b", ""


In this case:
- the message arriving to the splitter has both IN and OUT as before
(IN contains the first message, OUT contains the aggregated list) ,
because of the missing pipeline-triggering
- the splitter uses body(), but the body() method returns the IN even
if the OUT is not null. The splitting incorrectly happens on the IN
body instead of the OUT, that is a string containing the first
message, so there is only 1 iteration
    (aside note: the outBody() in Camel seems to be deprecated. What
is a correct alternative?)
- the aggregation strategy returns an exchange with an OUT as before.
I think this is not a correct way to implement an Aggregation
Strategy, in fact I didn't find any example in the documentation of an
aggregation strategy returning an exchange with an OUT

If after the logging the message is written to file the arraylist in
the OUT is shifted into the IN and we have an error:

                from("direct:start")
                        .log("Receiving \"${body}\" with correlation
key \"${header.myId}\"")
                        .aggregate(header("myId"),
                                new MyAggregationStrategyFaulty())
                                        .completionSize(2)
                                .log("AGGREGATE SUBROUTE (in, out):
\"${in.body}\", \"${out.body}\"")
                                .to("file:data?fileName=asd")
                                .end()
                        .log("END ROUTE (in, out): \"${in.body}\",
\"${out.body}\"")
                        .to("mock:result");


2017-08-22 11:54:00,408 [main           ] INFO  route1
        - Receiving "a" with correlation key "1"
2017-08-22 11:54:00,410 [main           ] INFO  route1
        - END ROUTE (in, out): "a", ""
2017-08-22 11:54:00,411 [main           ] INFO  route1
        - Receiving "b" with correlation key "1"
2017-08-22 11:54:00,415 [main           ] INFO  route1
        - AGGREGATE SUBROUTE (in, out): "a", "[a, b]"
2017-08-22 11:54:00,418 [main           ] ERROR DefaultErrorHandler
        - Failed delivery for (MessageId:
ID-giuliano-Latitude-E6540-37043-1503395640049-0-7 on ExchangeId:
ID-giuliano-Latitude-E6540-37043-1503395640049-0-5). Exhausted after
delivery attempt: 1 caught:
org.apache.camel.component.file.GenericFileOperationFailedException:
Cannot store file: data/asd
org.apache.camel.component.file.GenericFileOperationFailedException:
Cannot store file: data/asd
...
Caused by: org.apache.camel.InvalidPayloadException: No body available
of type: java.io.InputStream but has value: [a, b] of type:
java.util.ArrayList ....
...



So in order to correct my error I changed the implementation of the
aggregation strategy returning an exchange with the IN instead of the
OUT, but I have a few questions:
- Is the missing pipeline-triggering inside the aggregator subroute a
bug or a desired feature?
- Is it correct to implement an aggregation strategy returning an
exchange with OUT ?

Thanks,
Giuliano Santandrea