You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Michael Lück <mi...@hm-ag.de> on 2017/08/16 15:10:41 UTC

Race Condition in Aggregation using HazelcastAggregationRepository in a cluster

Hi there,

we just had an issue in one of our systems and it looks like there is an
issue with locking in the AggregateProcessor in a 
distributed environment.

I’ll try to explain it:

===================================================
Scenario
===================================================
	
We use camel-core and camel-hazelcast 2.16.5  and hazelcast 3.5.2

We have a route which sends a message to an Websphere MQ Queue (via
JMSComponent) and after that we put 
the message into an aggregator which uses the JMSCorrelationId to correlate
the request and the response.

from(epAggregation)
  .aggregate(header("JMSCorrelationID"), new CustomAggregationStrategy())
 
.completionTimeout(Integer.parseInt(getContext().resolvePropertyPlaceholders
("{{timeout}}")))
  .completionSize(2)
  .aggregationRepository(aggrRepo)

The aggregationRepository aggrRepo is created like this 
  HazelcastAggregationRepository  aggrRepo = new
HazelcastAggregationRepository ("aggrRepoDrsBatch", hcInst));
where hcInst is an Instance of com.hazelcast.core.HazelcastInstance.

We also have another route which reads the response from the response queue
and forwards it to the aggregator. 

The environment consists of two nodes on which the same code is running (so
essentially the send and response routes 
and the aggregation)

The problem arises when the response is returned really fast and is consumed
on the node that didn't sent the response.

========================================
Analysis
========================================

I digged a bit in the camel code and it seems to me that the problem here is
the lock in the AggregateProcessor as it is local 
to the VM in which the code runs. I'll try for an example to make this more
clear:

- Node A sends a MQ message and after that it puts the message into the
aggregator. The AggregateProcessor runs and 
  checks the lock before going into doAggregation()
	- in doAggregation it tries to get the Exchange from the repository
and doesn't find any. So it continues to aggregate 
                the first message an writes this into the repository
- In about the same time between reading the exchange from the repository
and before writing the "aggregated" first 
  message into the repository Node B fetches the reply from the response
queue and sends it to the aggregator. As in node A 
  the lock is checked and as the code runs on another VM the lock is free
and the AggregateProcessor can go to doAggregation
	- in doAggregation the Node tries to get the Exchange from the
repository before the other node has written it. 
	  And like Node A the code proceeds with creating the first Exchange
for the aggregation and writes in into the 
                repository.

The result is that one of the nodes will override the Exchange the other
created before. And the Aggreagtion will never 
complete (actually it does but because of the timeout)

=========================================
Ideas to solve the problem
=========================================
- probably optimistic locking is an option here as
HazelcastAggregationRepository supports this by implementing 
  OptimisticLockingAggregationRepository
=> I'd like to hear your thoughts on this.

- currently we can stop the route consuming from the response route on one
Node to eliminate the error. But this is not 
  an option for a long time because we lose the ability for fail over 
- probably it's an idea to make the AggregateProcessor get the Lock Object
from the repository. So for example for the 
  HazelcastAggregationRepository the repository can return the lock object
for the hazelcast map which would lock it for the 
  whole cluster. 
- I thought about resending the MQ message in case of an timeout but as the
request has side effects on the system that 
  processes the message this is not really an option.

So I hope I could make myself clear,
If you have any questions which would help you to help me, I'd happy to
answer them. 

Regards,
Michael




AW: Race Condition in Aggregation using HazelcastAggregationRepository in a cluster

Posted by Michael Lück <mi...@hm-ag.de>.
Hi Zoran,

thanks for your reply. I added my notes below.

> -----Ursprüngliche Nachricht-----
> Von: Zoran Regvart [mailto:zoran@regvart.com]
> Gesendet: Sonntag, 20. August 2017 00:52
> An: users@camel.apache.org
> Betreff: Re: Race Condition in Aggregation using
> HazelcastAggregationRepository in a cluster
> 
> Hi Michael,
> it's a bit hard to follow so I could be misunderstanding your issue;
> is your issue that there is a race condition between the aggregator
> that expects the reply on node A and another aggregator that is not
> aware of the initial request on node B?

That's pretty close. But node A ist not expecting the reply. It should not make a difference 
whether the reply is consumed on node A or node B. That is why we used the 
HazelcastAggregationRepository in the first place. 

The problem is, that AggregateProcessor uses a synchronization lock so that other threads have to wait to access the repository.
But if the threads run on another node and therefore in another JVM that lock is not sufficient.

> 
> If you're doing only request-reply correlation perhaps take a look at
> InOut message exchange pattern with a correlation property[1] with the
> replying application setting the ReplyToQMgr to the requester's queue
> manager.

We need the aggregator to save us a process instance id from an BPMN process that uses the request route to send the message.
When the response comes back we fetch this id from the aggregated message in order to send the response to the correct process instance.
We decided against InOut ExchangePattern because the response will not be processed if the node of the requestor fails. 
Instead of this we implemented the InOut by using the JMSMessageID and JMSCorrelationId as aggregation condition. Because of the hazelcastAggregationRepository both nodes could handle any incoming message.

> 
> Or, place the reply in a Hazelcast queue regardless of the queue
> manager the reply landed on and process the reply from there.

That's a good idea. I'll have a look at this. 

> 
> Also I think that it would be better to setup the reply coordination
> expectation (with timeouts and without transactions -- that would
> block) before sending the message.
> 

Sorry but I think I didn't understand this. The message is sent without any transaction and the timeout is implemented using the timeout mechanism of the aggregation. We used it like this because we don't need any more coordination between the nodes, only the aggregation repository. 

> 2c

By the way: I now use optimistic locking but had to fix it in the HazelcastAggregationRepository (see https://github.com/thuri/hazelcastrepository-optimistic-issue) to make it work. I'll send an PR in a few days.

Regards,
Michael

> 
> [1] https://camel.apache.org/correlation-identifier.html
> 
> On Wed, Aug 16, 2017 at 5:10 PM, Michael Lück <mi...@hm-ag.de>
> wrote:
> > Hi there,
> >
> > we just had an issue in one of our systems and it looks like there is an
> > issue with locking in the AggregateProcessor in a
> > distributed environment.
> >
> > I’ll try to explain it:
> >
> > ===================================================
> > Scenario
> > ===================================================
> >
> > We use camel-core and camel-hazelcast 2.16.5  and hazelcast 3.5.2
> >
> > We have a route which sends a message to an Websphere MQ Queue (via
> > JMSComponent) and after that we put
> > the message into an aggregator which uses the JMSCorrelationId to
> correlate
> > the request and the response.
> >
> > from(epAggregation)
> >   .aggregate(header("JMSCorrelationID"), new
> CustomAggregationStrategy())
> >
> >
> .completionTimeout(Integer.parseInt(getContext().resolvePropertyPlacehol
> ders
> > ("{{timeout}}")))
> >   .completionSize(2)
> >   .aggregationRepository(aggrRepo)
> >
> > The aggregationRepository aggrRepo is created like this
> >   HazelcastAggregationRepository  aggrRepo = new
> > HazelcastAggregationRepository ("aggrRepoDrsBatch", hcInst));
> > where hcInst is an Instance of com.hazelcast.core.HazelcastInstance.
> >
> > We also have another route which reads the response from the response
> queue
> > and forwards it to the aggregator.
> >
> > The environment consists of two nodes on which the same code is running
> (so
> > essentially the send and response routes
> > and the aggregation)
> >
> > The problem arises when the response is returned really fast and is
> consumed
> > on the node that didn't sent the response.
> >
> > ========================================
> > Analysis
> > ========================================
> >
> > I digged a bit in the camel code and it seems to me that the problem here is
> > the lock in the AggregateProcessor as it is local
> > to the VM in which the code runs. I'll try for an example to make this more
> > clear:
> >
> > - Node A sends a MQ message and after that it puts the message into the
> > aggregator. The AggregateProcessor runs and
> >   checks the lock before going into doAggregation()
> >         - in doAggregation it tries to get the Exchange from the repository
> > and doesn't find any. So it continues to aggregate
> >                 the first message an writes this into the repository
> > - In about the same time between reading the exchange from the
> repository
> > and before writing the "aggregated" first
> >   message into the repository Node B fetches the reply from the response
> > queue and sends it to the aggregator. As in node A
> >   the lock is checked and as the code runs on another VM the lock is free
> > and the AggregateProcessor can go to doAggregation
> >         - in doAggregation the Node tries to get the Exchange from the
> > repository before the other node has written it.
> >           And like Node A the code proceeds with creating the first Exchange
> > for the aggregation and writes in into the
> >                 repository.
> >
> > The result is that one of the nodes will override the Exchange the other
> > created before. And the Aggreagtion will never
> > complete (actually it does but because of the timeout)
> >
> > =========================================
> > Ideas to solve the problem
> > =========================================
> > - probably optimistic locking is an option here as
> > HazelcastAggregationRepository supports this by implementing
> >   OptimisticLockingAggregationRepository
> > => I'd like to hear your thoughts on this.
> >
> > - currently we can stop the route consuming from the response route on
> one
> > Node to eliminate the error. But this is not
> >   an option for a long time because we lose the ability for fail over
> > - probably it's an idea to make the AggregateProcessor get the Lock Object
> > from the repository. So for example for the
> >   HazelcastAggregationRepository the repository can return the lock object
> > for the hazelcast map which would lock it for the
> >   whole cluster.
> > - I thought about resending the MQ message in case of an timeout but as
> the
> > request has side effects on the system that
> >   processes the message this is not really an option.
> >
> > So I hope I could make myself clear,
> > If you have any questions which would help you to help me, I'd happy to
> > answer them.
> >
> > Regards,
> > Michael
> >
> >
> >
> 
> 
> 
> --
> Zoran Regvart


Re: Race Condition in Aggregation using HazelcastAggregationRepository in a cluster

Posted by Zoran Regvart <zo...@regvart.com>.
Hi Michael,
it's a bit hard to follow so I could be misunderstanding your issue;
is your issue that there is a race condition between the aggregator
that expects the reply on node A and another aggregator that is not
aware of the initial request on node B?

If you're doing only request-reply correlation perhaps take a look at
InOut message exchange pattern with a correlation property[1] with the
replying application setting the ReplyToQMgr to the requester's queue
manager.

Or, place the reply in a Hazelcast queue regardless of the queue
manager the reply landed on and process the reply from there.

Also I think that it would be better to setup the reply coordination
expectation (with timeouts and without transactions -- that would
block) before sending the message.

2c

[1] https://camel.apache.org/correlation-identifier.html

On Wed, Aug 16, 2017 at 5:10 PM, Michael Lück <mi...@hm-ag.de> wrote:
> Hi there,
>
> we just had an issue in one of our systems and it looks like there is an
> issue with locking in the AggregateProcessor in a
> distributed environment.
>
> I’ll try to explain it:
>
> ===================================================
> Scenario
> ===================================================
>
> We use camel-core and camel-hazelcast 2.16.5  and hazelcast 3.5.2
>
> We have a route which sends a message to an Websphere MQ Queue (via
> JMSComponent) and after that we put
> the message into an aggregator which uses the JMSCorrelationId to correlate
> the request and the response.
>
> from(epAggregation)
>   .aggregate(header("JMSCorrelationID"), new CustomAggregationStrategy())
>
> .completionTimeout(Integer.parseInt(getContext().resolvePropertyPlaceholders
> ("{{timeout}}")))
>   .completionSize(2)
>   .aggregationRepository(aggrRepo)
>
> The aggregationRepository aggrRepo is created like this
>   HazelcastAggregationRepository  aggrRepo = new
> HazelcastAggregationRepository ("aggrRepoDrsBatch", hcInst));
> where hcInst is an Instance of com.hazelcast.core.HazelcastInstance.
>
> We also have another route which reads the response from the response queue
> and forwards it to the aggregator.
>
> The environment consists of two nodes on which the same code is running (so
> essentially the send and response routes
> and the aggregation)
>
> The problem arises when the response is returned really fast and is consumed
> on the node that didn't sent the response.
>
> ========================================
> Analysis
> ========================================
>
> I digged a bit in the camel code and it seems to me that the problem here is
> the lock in the AggregateProcessor as it is local
> to the VM in which the code runs. I'll try for an example to make this more
> clear:
>
> - Node A sends a MQ message and after that it puts the message into the
> aggregator. The AggregateProcessor runs and
>   checks the lock before going into doAggregation()
>         - in doAggregation it tries to get the Exchange from the repository
> and doesn't find any. So it continues to aggregate
>                 the first message an writes this into the repository
> - In about the same time between reading the exchange from the repository
> and before writing the "aggregated" first
>   message into the repository Node B fetches the reply from the response
> queue and sends it to the aggregator. As in node A
>   the lock is checked and as the code runs on another VM the lock is free
> and the AggregateProcessor can go to doAggregation
>         - in doAggregation the Node tries to get the Exchange from the
> repository before the other node has written it.
>           And like Node A the code proceeds with creating the first Exchange
> for the aggregation and writes in into the
>                 repository.
>
> The result is that one of the nodes will override the Exchange the other
> created before. And the Aggreagtion will never
> complete (actually it does but because of the timeout)
>
> =========================================
> Ideas to solve the problem
> =========================================
> - probably optimistic locking is an option here as
> HazelcastAggregationRepository supports this by implementing
>   OptimisticLockingAggregationRepository
> => I'd like to hear your thoughts on this.
>
> - currently we can stop the route consuming from the response route on one
> Node to eliminate the error. But this is not
>   an option for a long time because we lose the ability for fail over
> - probably it's an idea to make the AggregateProcessor get the Lock Object
> from the repository. So for example for the
>   HazelcastAggregationRepository the repository can return the lock object
> for the hazelcast map which would lock it for the
>   whole cluster.
> - I thought about resending the MQ message in case of an timeout but as the
> request has side effects on the system that
>   processes the message this is not really an option.
>
> So I hope I could make myself clear,
> If you have any questions which would help you to help me, I'd happy to
> answer them.
>
> Regards,
> Michael
>
>
>



-- 
Zoran Regvart