You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Aaron Whiteside (JIRA)" <ji...@apache.org> on 2013/03/01 00:41:12 UTC

[jira] [Commented] (CAMEL-6042) AggregateProcessor/AggregationRepository does not deal with optimistic locking - will not work correctly in a distributed environment

    [ https://issues.apache.org/jira/browse/CAMEL-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13590063#comment-13590063 ] 

Aaron Whiteside commented on CAMEL-6042:
----------------------------------------

When do we expect the 2.11 release to be cut?
                
> AggregateProcessor/AggregationRepository does not deal with optimistic locking - will not work correctly in a distributed environment
> -------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-6042
>                 URL: https://issues.apache.org/jira/browse/CAMEL-6042
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-core
>    Affects Versions: 2.10.3
>         Environment: Glassfish + Gemini Blueprint + Spring 3.2
>            Reporter: Aaron Whiteside
>            Assignee: Claus Ibsen
>             Fix For: 2.11.0
>
>         Attachments: aggregate-optimistic-locking-support1.patch, aggregate-optimistic-locking-support2.patch, aggregate-optimistic-locking-support.patch, javadoc-and-unit-test-updates.patch
>
>
> AggregateProcessor/AggregationRepository does not deal with optimistic locking - and will not work correctly in a distributed environment.
> I started to write a Voldemort specific AggregationRepository I saw that the AggregateProcessor does not deal with optimistic locking. It uses a single AggregateProcessor instance specific lock.
> In a distributed environment where there are many Camel instances on many servers using a shared data store for the AggregationRepository this will not work.
> Consider the following scenario using a persistent/shared AggregationRepository:
> Camel instance A on server A, receives Exchange 1..
> # AggregateProcessor first acquires instance specific lock.
> # AggregateProcessor calls oldExchange = AggregationRepository.get()
> # oldExchange is null
> Camel instance B on server B, receives Exchange 2..
> # AggregateProcessor first acquires instance specific lock.
> # AggregateProcessor calls oldExchange = AggregationRepository.get()
> # oldExchange is null
> Camel instance A & B at the same time both call..
> # AggregateProcessor calls AggregationStrategy with the new exchange and old null exchange
> # aggregationRepository.add() with the result (the new exchange)
> # Camel instance A succeeds to store the new exchange.
> # Camel instance B fails with an exception stating that something is already stored using that exchange id.
> ## at this point I could write my AggregationRepository implementation to ignore the existing entry and overwrite it. But this would mean the exiting exchange is lost and never aggregated.
> A possible solution would be:
> a) Remove the lock from AggregateProcessor 
>  a1) Put the lock in the MemoryAggregationRepository or 
>  a2) Use a ConcurrentHashMap.putIfAbsent method (and then continue on to do B below).
> b) Introduce an AggregationRepositoryOptimisticLockException (name it whatever you want) that is thrown when an AggregationRepository detects that someone is trying to add() the same exchange id at the same time.
> Upon receiving this exception the AggregateProcessor would re-get() the oldExchange (now not null) from the AggregationRepository and call the AggregationStrategy again to aggregate the old and the new exchanges.
> This would ensure that no exchanges fail to aggregate in a distributed environment. Given that the underlying AggregationRepository is able to detect concurrent add()'s. Which most should be able to (using conditional updates).
> For example:
> SQL could try and insert into a table with a unique constraint on the correlation id. When the constraint is violated JPA/JDBC/whatever will throw a unique constraint violation exception which can be converted into a AggregationRepositoryOptimisticLockException.
> And HawtDB supports optimistic locking out of the box, by throwing a OptimisticUpdateException when it detects concurrent updates. So updating this component to take advantage of this feature should be very simple.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira