You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Mark Citizen <mc...@secondmarket.com> on 2011/08/05 15:55:55 UTC

problems with Camel aggregation pattern

Hello,

I'm using Camel version 2.8.0.
I have a file with rows consisting of two columns: parent, child. Both are
strings. Example:
child1,parent1
child2,parent1
child3,parent1
child1,parent2
child2,parent2

I wrote a custom AggregationStrategy class that groups child values into a
single collection, based on the parent. My aggregate(Exchange
oldExchange,Exchange newExchange) method looks like this:

 if (oldExchange == null) {
  parent = new Map();
  newExchange.getIn().setBody(parent);
 } else {
  parent = (Map) oldExchange.getIn().getBody();
 }
 //put child into map
 return oldExchange == null ? newExchange : oldExchange;

Messages are correlated based on parent column. CompletionPredicate is coded
like this:

 //instance variable lastParent;

 matches(Exchange exchange) {
  tuple= exchange.getIn().getBody();

  if (lastParent == null) { //first record, no history
   lastParent = tuple.parent;
   return false;
  } else if (lastParent.equals(tuple.parent)) { //same parent, part of batch
   return false;
  }

  lastParent = tuple.parent; //different parent, new batch
  return true;
 }


My Camel route looks something like this:

from("direct:input").aggregate(parentCorrelation, myAggregationStrategy)

.aggregationRepository(repository).completionPredicate(myCompletionPredicate)
 .process(myProcessor).to("direct:output");


The code works as expected to a point. Child columns are grouped into single
map, based on the parent column. But the first map produced by the
aggregator is skipped (it is not delivered to the processor), and subsequent
maps delivered to the processor contain only one entry (the first child
column).

So the steps look something like this:

//last step for first batch
- adding map to repository, map contains 3 children, parent is parent1
- completion predicate kicks in since parent2 is different from parent1
- map for parent2 is removed from repository and sent to processor (map
contains only one entry)
- new map is created for parent2
- processing continues


I think this behavior is related to how aggregationStrategy and
completionPredicate work together.
If anyone has had to deal with such scenario in the past I'd appreciate any
info on how to make it work?
Thanks,

Mark




--
View this message in context: http://camel.465427.n5.nabble.com/problems-with-Camel-aggregation-pattern-tp4669621p4669621.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: problems with Camel aggregation pattern

Posted by Mark Citizen <mc...@secondmarket.com>.
Never mind, I realized my case only applied to data coming in in series.
In case anyone runs into a similar problem here is a possible solution:

1) implement custom class:
public class MySpecialRepository implements AggregationRepository, Predicate

2) use an instance of this class in your route as both aggregationRepository
and completionPredicate:
from("direct:input").aggregate(myCorrelationPredicate,
myAggregationStrategy)
.aggregationRepository(mySpecialRepository) // here
.completionPredicate(mySpecialRepository) // and here

3) in repository's remove method add delayed read logic:

//instance variable currentKey
public void remove(CamelContext context, String key, Exchange exchange)
 if(key <> currentKey)
  return previousExchange
unless
 exchange header contains end of file (in which case return current
exchange)

Thanks,

Mark


--
View this message in context: http://camel.465427.n5.nabble.com/problems-with-Camel-aggregation-pattern-tp4669621p4669869.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: problems with Camel aggregation pattern

Posted by Mark Citizen <mc...@secondmarket.com>.
I narrowed down the problem to this:

1) Camel correlation selects records based on some key field.
2) Camel aggregation retrieves existing record from repository based on the
key field.
3) Camel checks if aggregation is completed by consulting the completion
predicate.
4) New correlation key comes into play.

It looks like there is no way to tell Camel to evaluate the completion
predicate when the new correlation key enters the scene?
In this scenario Camel is checking the completion predicate at the same time
the new record enters the scene, so returning TRUE from completion predicate
says 'the new record is the last one from its batch', which is not right.

Would anyone have any ideas on how to tackle this, short of implementing
some kind of read-behind scenario?
Thanks,

Mark


--
View this message in context: http://camel.465427.n5.nabble.com/problems-with-Camel-aggregation-pattern-tp4669621p4669795.html
Sent from the Camel - Users mailing list archive at Nabble.com.