You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Riggy Software <ri...@hotmail.com> on 2019/01/23 10:32:40 UTC

Camel split on a Multiconsumer seda route exchange property mix up

Hi Guys,

I'm having a confusing problem with split aggregate.

An exchange comes in from a SEDA endpoint.
I then set a property.
Then split the body (a list of keys) and use an ArrayListAggregator.
After the split-aggregate, I then retrieve the property (which happens to be a map that I use to cross reference against the body elements).

Now this works just fine with a single thread, but when I increase the concurrentConsumers some of the exchanges are throwing NPE because somehow the next exchange consumed by the seda endpoint is having it’s property set on the other exchange. I’m not sure how this is happening as I thought such changes on the exchanges would be thread local

Here is my route:



fromF("seda:%s?concurrentConsumers=%s", jobName, concurrentConsumers)
.choice()
.when(hasResultPredicate)
   .to(mainQuery)
    .endChoice()
    .end()
.process(exchange -> {
Map<Integer, PlayerDetailsDTO> playersMap = new HashMap<Integer, PlayerDetailsDTO>();
List<Integer> playerIds = new ArrayList<Integer>();


List<PlayerDetailsDTO> playerDetailsList = exchange.getIn().getBody(List.class);

//message.setJobRecords(playerDetailsList);

for(PlayerDetailsDTO playerDetailsDto : playerDetailsList) {

playersMap.put(playerDetailsDto.getPlayerKey(), playerDetailsDto);
playerIds.add(playerDetailsDto.getPlayerKey());

}
exchange.setProperty(PLAYERS_MAP, playersMap);
exchange.getIn().setBody(playerIds);


})
.choice()
.when(hasResultPredicate)
.split(body())
.to(supplementaryQuery)
.aggregate(constant(true), new ArrayListAggregationStrategy())
.completionPredicate(splitSizeBatchPredicate)
.to("direct:suppProcessing")
.process(exchange -> {
Map<Integer,PlayerDetailsDTO> playersMap =
exchange.getProperty(RudJobResultProcessor.PLAYERS_MAP, Map.class);


List<List<PlayerStatusHistoryDTO>> statusHistoryListOfList =
exchange.getIn().getBody(List.class);


statusHistoryListOfList.stream().forEach(statusHistoryList -> {
if(!statusHistoryList.isEmpty()) {
PlayerDetailsDTO player = playersMap.get(statusHistoryList.get(0).getPlayerKey());
player.setPlayerStatusHistory(statusHistoryList);// <—- NPE occurring here when 2 consumers as player == null
}
});})
.end()
.endChoice()
.end()
.to(SqlJobControllerRoute.POST_MESSAGE_SEDA);


I inlined the processors for completeness. Any suggestions would be great.


Many thanks,
Naseem

p.s. I sent a message earlier about db isolation levels but in fact the problem is during the split so disregard it.


Get Outlook for Android<https://aka.ms/ghei36>