You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by Moshe Recanati <re...@gmail.com> on 2017/04/25 19:29:10 UTC

Using stream resequencer with Custom comparator

Hi,
I'm using stream resequencer with custom comparator.
I'm having issue that some of the elements compared return 0 as they are
equal in terms of sorting but different in content level.
This cause a situation that these elements not pushed to the consumer.
I tried to add timestamp to the elements and take this as the compare in
case it's equal but than the queue entered into endless loop.
Are you familiar with this situation? Anything that I can do to solve it?

I know there is batch option of resequencer however I don't know how to
implement custom comparator for this use case. Can you help with this?

Below you can find short snipplet from my code:

Thank you in advance,
Moshe


*Configure*
*context.addComponent("activemq",*
*
ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false"));*
* context.addRoutes(new RouteBuilder() {*
* @Override*
* public void configure() throws Exception {*
* // START SNIPPET: e1*
* from("activemq:queue:test.queue")*

*
.resequence(header("seqno")).stream().timeout(50000).capacity(10).comparator(comparator)*
* // END SNIPPET: e1*
* }*
* });*

*Comperator*
*public int compare(Exchange arg0, Exchange arg1) {*

* ICM icmA = (ICM) arg0.getIn().getBody();*
* ICM icmB = (ICM) arg1.getIn().getBody();*

* List<ICM> icmsA;*
* List<ICM> icmsB;*
* if (icmRepository != null) {*
* icmsA =
icmRepository.findByProspectIDAndIcmStatus(icmA.getProspect().get_id(), 1);*
* icmsB =
icmRepository.findByProspectIDAndIcmStatus(icmB.getProspect().get_id(), 1);*
* }*
* else*
* {*
* icmsA = new ArrayList<ICM>();*
* icmsA.add(icmA);*
* icmsB = new ArrayList<ICM>();*
* icmsB.add(icmB);*
* }*
* int icmATime = (int) (icmA.getTimeStamp() / 1000);*
* int icmBTime = (int) (icmB.getTimeStamp() / 1000) + 1;*
* int ICMA = calculateValue(icmsA)/* * 10000 + icmATime */;*
* int ICMB = calculateValue(icmsB)/* * 10000 + icmBTime */;*

* int compare = ICMB - ICMA;*

* /**
* * if (compare == 0 && icmA != icmB) { compare = (int) i++; }*
* */*

* System.out.println("^^^^^^^^comparing messages " + icmA + " .... " + icmB
+ " result= ICMA - " + ICMA*
* + " ICMB - " + ICMB + " total - " + compare);*
* return compare;*
*}*