You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Ar...@nomura.com on 2014/10/29 19:46:26 UTC

Aggregator race condition in recovery task?

Hi,

We’re using the camel 2.12.3 library for aggregation in one of our applications. The JdbcAggregationRepository is used to maintain the state. The aggregator is supposed to function based on time interval i.e. every ‘n’ ms it should send out the aggregated output.
We’re seeing duplicate messages being generated from the recovery task on a frequent basis. The recovery task is supposed to run every 5 sec.

By enabling the debug logs, and on further investigation I see the following sequence of events in the logs

1.        [AggregateTimeoutChecker thread] - AggregateProcessor.run()  : Completion interval triggered for correlation key

2.       [AggregateTimeoutChecker thread] - JdbcAggregationRepository.doInTransactionWithoutResult()  : Removing key

3.       [AggregateRecoverChecker thread] - AggregateProcessor.run()  : Starting recover check

4.       [AggregateRecoverChecker thread] - JdbcAggregationRepository.mapRow()  : getKey

5.       [AggregateTimeoutChecker thread] - AggregateProcessor.onSubmitCompletion() : Aggregation complete for correlation key

6.       [AggregateRecoverChecker thread] - AggregateProcessor.run() : Loading aggregated exchange with id

7.       [AggregateTimeoutChecker thread] - AggregateProcessor.run() : Processing aggregated exchange

8.       [AggregateRecoverChecker thread] - JdbcAggregationRepository.recover() - Recovering exchangeId

9.       [AggregateRecoverChecker thread] - AggregateProcessor.run() - Delivery attempt: 1 to recover aggregated exchange with id

10.   [AggregateRecoverChecker thread] - AggregateProcessor.onSubmitCompletion() - Aggregation complete for correlation key


In short, in the timeout checker, it removes the exchange from XXX table and adds it to XXX_completed table. Then a little later in the processing, it adds the exchange to the “inProgressCompleteExchanges” structure.
In the recover task, it scans the XXX_completed table and then checks if they are present in “inProgressCompleteExchanges” before attempting to recover.

This usage of “inProgressCompleteExchanges” looks dodgy to me as I don’t see any explicit locking for it in the recover task while the timeout task may be updating it simultaneously.
This is how the set is created in AggregateProcessor.
private final Set<String> inProgressCompleteExchanges = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());

How does it ensure synchronization in this case?
Is there something I’m missing in this analysis? I don’t understand how add and contains methods on this set called from different threads will be synchronized in this case? Any help in this regards will be much appreciated.

Thanks,
Archis



PLEASE READ: This message is for the named person's use only. It may contain confidential, proprietary or legally privileged information. No confidentiality or privilege is waived or lost by any mistransmission. If you receive this message in error, please delete it and all copies from your system, destroy any hard copies and notify the sender. You must not, directly or indirectly, use, disclose, distribute, print, or copy any part of this message if you are not the intended recipient. Nomura Holding America Inc., Nomura Securities International, Inc, and their respective subsidiaries each reserve the right to monitor all e-mail communications through its networks. Any views expressed in this message are those of the individual sender, except where the message states otherwise and the sender is authorized to state the views of such entity. Unless otherwise stated, any pricing information in this message is indicative only, is subject to change and does not constitute an offer to deal at any price quoted. Any reference to the terms of executed transactions should be treated as preliminary only and subject to our formal written confirmation.


Re: Aggregator race condition in recovery task?

Posted by bendherville <be...@gmail.com>.
The issue you're experiencing may be related to
https://issues.apache.org/jira/browse/CAMEL-4271



--
View this message in context: http://camel.465427.n5.nabble.com/Aggregator-race-condition-in-recovery-task-tp5758315p5760530.html
Sent from the Camel - Users mailing list archive at Nabble.com.