You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Baris Acar <ba...@acar.org.uk> on 2013/09/18 02:49:12 UTC

Aggregator lock

Hi,

I'm seeing some surprising behaviour with my camel route, and was hoping
someone in this group could help, as my trawl through the docs and Camel In
Action book have not found the answers I'm looking for. Apologies if this
question has been clearly answered elsewhere :-/

I have a route that looks a little like the following:

    from("seda:foo?concurrentConsumers=2")
      .aggregate(header("myId"), myAggregationStrategy).completionSize(5)
        .log("Sending out ${body} after a short pause...")
        .delay(3000) // simulate a lengthy process
        .log("Sending out ${body} imminently!")
        .to(...) // other downstream processing

Note that I'm using a SEDA with two *concurrent* consumers. I expected that
once a SEDA consumer thread has picked up a message that completes an
aggregation, that downstream processing will continue on that consumer
thread, whilst other such downstream processing for another 'completed
aggregation' message may be happening in parallel on the other SEDA
consumer thread.

What I'm finding instead is that whilst all of the work downstream of
aggregate() does occur across the two consumer threads, it is serialised;
no two threads execute the processors at the same time. This becomes quite
noticeable if this downstream work is lengthy. I've uploaded a sample to
https://github.com/bacar/aggregator-lock, which you can run with mvn test
-Dtest=AggregateLock. It started from a sample from the CIA book.

For example, you can see the whilst the second "Sending... after a short
pause" does occur on a separate thread (#2), it does not start until after
thread #1 has completed, despite the 3s delay():

2013-09-18 00:45:15,693 [el-1) thread #1 - Threads] INFO  route1 - Sending
out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] after a short pause...
2013-09-18 00:45:18,695 [el-1) thread #1 - Threads] INFO  route1 - Sending
out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] imminently!
2013-09-18 00:45:18,696 [el-1) thread #2 - Threads] INFO  route1 - Sending
out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] after a short pause...
2013-09-18 00:45:21,698 [el-1) thread #2 - Threads] INFO  route1 - Sending
out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] imminently!

Is this behaviour expected? I found it _very_ surprising. Did I miss
something in the docs that describes this behaviour? If the behaviour is
expected, I am happy to try adding some info to the documentation if
someone can explain the intent behind it.

I'm not terribly familiar with the code, but I've had a dig around, and it
looks like the reason for this behaviour is due to the following code
inside the process() method of
org.apache.camel.processor.aggregate.AggregateProcessor:

            lock.lock();
            try {
                doAggregation(key, copy);
            } finally {
                lock.unlock();
            }

The doAggregation() method performs both the aggregation (i.e., adding the
new exchange to the repository, checking if the completion criteria have
been met etc) _and_, if complete, submits the aggregated message to the
ExecutorService for downstream processing. However, since the default
executorService is the SynchronousExecutorService, all downstream
processing occurs synchronously with submission, and consequently, _within_
the lock above.

Whilst I can see obvious reasons that may make it necessary to perform the
actual aggregation inside a lock, I do find it quite surprising that the
downstream processing by default also occurs inside this lock. Are there
any other processors known to behave in this way, i.e., by taking a lock
around all downstream processing?

I could potentially work around this issue by dispensing with the SEDA
concurrentConsumers and using aggregate().parallelProcessing() instead,
with a suitable executorService() specified, but this introduces a number
of complications, e.g.:
- if I repeatedly split() and re-aggregate() (by different criteria), then
_every time_ I aggregate I have to add
parallelProcessing()/executorService(); this is verbose and error prone.
- with repeated aggregates in a route, I need dedicated threads/pools per
aggregate(), which means way more threads than I really want/need.
- regardless, I don't get the predictable and simple behaviour I expected
of 'pick up job from SEDA, aggregate, synchronously process downstream
jobs' that I'd expected.

Another possible workaround might be the optimistic locking, but I haven't
had the opportunity to study it yet. It seems unrelated - I think my
problem is with the very coarse granularity of the pessimistic lock, not
with whether it's optimistic. Plus, I don't really want my messages to ever
fail with a 'too many attempts to acquire the optimistic lock' exception,
and I might have quite high contention).

Many thanks in advance for your help/comments!

Baris.

Re: Aggregator lock

Posted by Baris Acar <ba...@acar.org.uk>.
Great, thanks Claus. I've attached a trivial unit test to the Jira issue showing behaviour that should pass following the fix, which you're welcome to include/adapt if it's useful. 

Barış

> On 21 Sep 2013, at 11:34, Claus Ibsen <cl...@gmail.com> wrote:
> 
> Hi
> 
> Yeah the sending to the thread pool could potential happen outside the
> lock. I have logged a ticket
> https://issues.apache.org/jira/browse/CAMEL-6775
> 
> It does complicated it a bit in the logic as there is potential also
> timeout and recovery tasks that operate as well.
> 
>> On Wed, Sep 18, 2013 at 7:37 PM, bacar <ba...@acar.org.uk> wrote:
>> Just to add yet another layer of clarity (I hope this is helping...)
>> 
>>            lock.lock();
>>            try {
>>                doAggregation(key, copy);
>>            } finally {
>>                lock.unlock();
>>            }
>> 
>> Taking a *very* simplistic view, doAggregation() does two things:
>> 1. combine messages together into an aggregated message, interact with
>> aggregation repo etc.
>> 2. if "complete", send the aggregated message to the downstream processor.
>> 
>> If we were call these tasks "performAggregation()" and "sendDownstream()"
>> then all I'm suggesting is that a refactoring in the following direction
>> seems like it would make far more sense:
>> 
>>            lock.lock();
>>            try {
>>                performAggregation(key, copy);
>>            } finally {
>>                lock.unlock();
>>            }
>>            if(completed) {
>>                sendDownstream(...);
>>            }
>> 
>> sendDownstream() would use the existing code to submit to an
>> executorService, which would be synchronous by default still. That way,
>> behaviour is unchanged except that *downstream processing no longer happens
>> inside the lock*.
>> 
>> I'm not, of course suggesting it's this trivial, there are several
>> complications in getting to this - this is just an outline.
>> 
>> Thanks
>> Baris.
>> 
>> 
>> 
>> --
>> View this message in context: http://camel.465427.n5.nabble.com/Aggregator-lock-tp5739692p5739771.html
>> Sent from the Camel - Users mailing list archive at Nabble.com.
> 
> 
> 
> -- 
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> Email: cibsen@redhat.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen

Re: Aggregator lock

Posted by bacar <ba...@acar.org.uk>.
Yes, I considered this as a temporary workaround but it doesn't really suit
our case; optimistic locking ought to work well when you don't expect a lot
of contention; unfortunately I expect large batches of tens of thousands of
sequential messages that I expect to get batched together into small groups.

However I'll definitely profile it / compare it to the other workaround of
introducing a thread pool.



--
View this message in context: http://camel.465427.n5.nabble.com/Aggregator-lock-tp5739692p5740053.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Aggregator lock

Posted by Claus Ibsen <cl...@gmail.com>.
You can use optimisticLocking which wont use the lock.


On Sat, Sep 21, 2013 at 12:34 PM, Claus Ibsen <cl...@gmail.com> wrote:
> Hi
>
> Yeah the sending to the thread pool could potential happen outside the
> lock. I have logged a ticket
> https://issues.apache.org/jira/browse/CAMEL-6775
>
> It does complicated it a bit in the logic as there is potential also
> timeout and recovery tasks that operate as well.
>
> On Wed, Sep 18, 2013 at 7:37 PM, bacar <ba...@acar.org.uk> wrote:
>> Just to add yet another layer of clarity (I hope this is helping...)
>>
>>             lock.lock();
>>             try {
>>                 doAggregation(key, copy);
>>             } finally {
>>                 lock.unlock();
>>             }
>>
>> Taking a *very* simplistic view, doAggregation() does two things:
>> 1. combine messages together into an aggregated message, interact with
>> aggregation repo etc.
>> 2. if "complete", send the aggregated message to the downstream processor.
>>
>> If we were call these tasks "performAggregation()" and "sendDownstream()"
>> then all I'm suggesting is that a refactoring in the following direction
>> seems like it would make far more sense:
>>
>>             lock.lock();
>>             try {
>>                 performAggregation(key, copy);
>>             } finally {
>>                 lock.unlock();
>>             }
>>             if(completed) {
>>                 sendDownstream(...);
>>             }
>>
>> sendDownstream() would use the existing code to submit to an
>> executorService, which would be synchronous by default still. That way,
>> behaviour is unchanged except that *downstream processing no longer happens
>> inside the lock*.
>>
>> I'm not, of course suggesting it's this trivial, there are several
>> complications in getting to this - this is just an outline.
>>
>> Thanks
>> Baris.
>>
>>
>>
>> --
>> View this message in context: http://camel.465427.n5.nabble.com/Aggregator-lock-tp5739692p5739771.html
>> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>
>
> --
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> Email: cibsen@redhat.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: Aggregator lock

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Yeah the sending to the thread pool could potential happen outside the
lock. I have logged a ticket
https://issues.apache.org/jira/browse/CAMEL-6775

It does complicated it a bit in the logic as there is potential also
timeout and recovery tasks that operate as well.

On Wed, Sep 18, 2013 at 7:37 PM, bacar <ba...@acar.org.uk> wrote:
> Just to add yet another layer of clarity (I hope this is helping...)
>
>             lock.lock();
>             try {
>                 doAggregation(key, copy);
>             } finally {
>                 lock.unlock();
>             }
>
> Taking a *very* simplistic view, doAggregation() does two things:
> 1. combine messages together into an aggregated message, interact with
> aggregation repo etc.
> 2. if "complete", send the aggregated message to the downstream processor.
>
> If we were call these tasks "performAggregation()" and "sendDownstream()"
> then all I'm suggesting is that a refactoring in the following direction
> seems like it would make far more sense:
>
>             lock.lock();
>             try {
>                 performAggregation(key, copy);
>             } finally {
>                 lock.unlock();
>             }
>             if(completed) {
>                 sendDownstream(...);
>             }
>
> sendDownstream() would use the existing code to submit to an
> executorService, which would be synchronous by default still. That way,
> behaviour is unchanged except that *downstream processing no longer happens
> inside the lock*.
>
> I'm not, of course suggesting it's this trivial, there are several
> complications in getting to this - this is just an outline.
>
> Thanks
> Baris.
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Aggregator-lock-tp5739692p5739771.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: Aggregator lock

Posted by bacar <ba...@acar.org.uk>.
Just to add yet another layer of clarity (I hope this is helping...)

            lock.lock(); 
            try { 
                doAggregation(key, copy); 
            } finally { 
                lock.unlock(); 
            } 

Taking a *very* simplistic view, doAggregation() does two things:
1. combine messages together into an aggregated message, interact with
aggregation repo etc.
2. if "complete", send the aggregated message to the downstream processor. 

If we were call these tasks "performAggregation()" and "sendDownstream()"
then all I'm suggesting is that a refactoring in the following direction
seems like it would make far more sense:

            lock.lock(); 
            try { 
                performAggregation(key, copy); 
            } finally { 
                lock.unlock(); 
            } 
            if(completed) {
                sendDownstream(...);
            } 

sendDownstream() would use the existing code to submit to an
executorService, which would be synchronous by default still. That way,
behaviour is unchanged except that *downstream processing no longer happens
inside the lock*.

I'm not, of course suggesting it's this trivial, there are several
complications in getting to this - this is just an outline.

Thanks
Baris.



--
View this message in context: http://camel.465427.n5.nabble.com/Aggregator-lock-tp5739692p5739771.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Aggregator lock

Posted by Baris Acar <ba...@acar.org.uk>.
Thanks Claus. It is still verbose/error prone - I have to repeat it every time I use such a processor (verbose) and I have to remember to do it (error prone). And that assumes I *know* that I have to do it in the first place - I don't know how I can tell whether a given processor holds a lock around all downstream processing without detailed inspection of the source. 

Here's an extreme example to illustrate what I mean:

from("direct:a")
  .threads(5)       // set up a thread pool
  .bean(dostuff)    // excellent, stuff is done in parallel on 5 threads - happy days.
  .aggregate(...)   // I'm a good dev and remembered to RE-introduce a threadpool using executorService, otherwise camel will not execute any of the following in the thread pool I made above. 
    .executorService(...)
  .multicast(...)   // does multicast() acquire a big lock on downstream work just like aggregate?
    .executorService(...)  // (...better process downstream on another executor service to be sure)
  .delay(1000)      // maybe delay will lock too? Add another executorService!
    .executorService(...)
  .log("got a msg") // how about now? Maybe log() locks all downstream threads too just like aggregate! Who knows?
    .threads(5)     // shall I just reintroduce the threadpool on every other line, just to be absolutely sure?
  .someOtherBuiltInCamelProcessor()
    .threads(5)     // ... if not, how can I know *when* I need to reintroduce it? Camel may arbitrarily lock me out!
  // etc

How can I reason about my system's behaviour without knowing which parts of my code will be executed under a seemingly arbitrary mutual exclusion lock? Which processors take this lock, and why?

Again, thanks for your time. 

Barış

Re: Aggregator lock

Posted by Claus Ibsen <cl...@gmail.com>.
And you just need to set executorService and refer to a thread pool to
be shared / or a thread pool profile as template for settings.
http://camel.apache.org/threading-model.html

That is one option to configure which is not verbose or error prone!

On Wed, Sep 18, 2013 at 5:21 PM, Claus Ibsen <cl...@gmail.com> wrote:
> It works as designed.
>
> If you want aggregator to use concurrent threads for downstream then
> you need to configure that.
>
> On Wed, Sep 18, 2013 at 5:15 PM, Baris Acar <ba...@acar.org.uk> wrote:
>> Hi Claus,
>>
>> Thanks for your reply! I've tried using parallelProcessing and it comes with a few drawbacks as I've mentioned already. We're going with it as a workaround but I'm interested to know whether you consider the issue I've reported to be a bug.
>>
>> Do you believe that it is intentional/expected that by default the AggregateProcessor *holds a mutual exclusion lock* across all downstream processing, by default? It's really very unexpected to me, and the docs you link to make no mention of acquiring a lock over other code unrelated to the aggregation. As a user of a framework, one needs to know if a framework is going to acquire a mutual exclusion lock over my code, in order to reason about the parallelism.
>>
>> Importantly - are there any other processors which acquire a lock over all downstream processing?
>>
>> Barış
>>
>>
>> On 18 Sep 2013, at 11:54, Claus Ibsen <cl...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> See the parallelProcessing / executorService option on the aggregator
>>> http://camel.apache.org/aggregator2
>>>
>>> On Wed, Sep 18, 2013 at 2:49 AM, Baris Acar <ba...@acar.org.uk> wrote:
>>>> Hi,
>>>>
>>>> I'm seeing some surprising behaviour with my camel route, and was hoping
>>>> someone in this group could help, as my trawl through the docs and Camel In
>>>> Action book have not found the answers I'm looking for. Apologies if this
>>>> question has been clearly answered elsewhere :-/
>>>>
>>>> I have a route that looks a little like the following:
>>>>
>>>>    from("seda:foo?concurrentConsumers=2")
>>>>      .aggregate(header("myId"), myAggregationStrategy).completionSize(5)
>>>>        .log("Sending out ${body} after a short pause...")
>>>>        .delay(3000) // simulate a lengthy process
>>>>        .log("Sending out ${body} imminently!")
>>>>        .to(...) // other downstream processing
>>>>
>>>> Note that I'm using a SEDA with two *concurrent* consumers. I expected that
>>>> once a SEDA consumer thread has picked up a message that completes an
>>>> aggregation, that downstream processing will continue on that consumer
>>>> thread, whilst other such downstream processing for another 'completed
>>>> aggregation' message may be happening in parallel on the other SEDA
>>>> consumer thread.
>>>>
>>>> What I'm finding instead is that whilst all of the work downstream of
>>>> aggregate() does occur across the two consumer threads, it is serialised;
>>>> no two threads execute the processors at the same time. This becomes quite
>>>> noticeable if this downstream work is lengthy. I've uploaded a sample to
>>>> https://github.com/bacar/aggregator-lock, which you can run with mvn test
>>>> -Dtest=AggregateLock. It started from a sample from the CIA book.
>>>>
>>>> For example, you can see the whilst the second "Sending... after a short
>>>> pause" does occur on a separate thread (#2), it does not start until after
>>>> thread #1 has completed, despite the 3s delay():
>>>>
>>>> 2013-09-18 00:45:15,693 [el-1) thread #1 - Threads] INFO  route1 - Sending
>>>> out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] after a short pause...
>>>> 2013-09-18 00:45:18,695 [el-1) thread #1 - Threads] INFO  route1 - Sending
>>>> out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] imminently!
>>>> 2013-09-18 00:45:18,696 [el-1) thread #2 - Threads] INFO  route1 - Sending
>>>> out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] after a short pause...
>>>> 2013-09-18 00:45:21,698 [el-1) thread #2 - Threads] INFO  route1 - Sending
>>>> out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] imminently!
>>>>
>>>> Is this behaviour expected? I found it _very_ surprising. Did I miss
>>>> something in the docs that describes this behaviour? If the behaviour is
>>>> expected, I am happy to try adding some info to the documentation if
>>>> someone can explain the intent behind it.
>>>>
>>>> I'm not terribly familiar with the code, but I've had a dig around, and it
>>>> looks like the reason for this behaviour is due to the following code
>>>> inside the process() method of
>>>> org.apache.camel.processor.aggregate.AggregateProcessor:
>>>>
>>>>            lock.lock();
>>>>            try {
>>>>                doAggregation(key, copy);
>>>>            } finally {
>>>>                lock.unlock();
>>>>            }
>>>>
>>>> The doAggregation() method performs both the aggregation (i.e., adding the
>>>> new exchange to the repository, checking if the completion criteria have
>>>> been met etc) _and_, if complete, submits the aggregated message to the
>>>> ExecutorService for downstream processing. However, since the default
>>>> executorService is the SynchronousExecutorService, all downstream
>>>> processing occurs synchronously with submission, and consequently, _within_
>>>> the lock above.
>>>>
>>>> Whilst I can see obvious reasons that may make it necessary to perform the
>>>> actual aggregation inside a lock, I do find it quite surprising that the
>>>> downstream processing by default also occurs inside this lock. Are there
>>>> any other processors known to behave in this way, i.e., by taking a lock
>>>> around all downstream processing?
>>>>
>>>> I could potentially work around this issue by dispensing with the SEDA
>>>> concurrentConsumers and using aggregate().parallelProcessing() instead,
>>>> with a suitable executorService() specified, but this introduces a number
>>>> of complications, e.g.:
>>>> - if I repeatedly split() and re-aggregate() (by different criteria), then
>>>> _every time_ I aggregate I have to add
>>>> parallelProcessing()/executorService(); this is verbose and error prone.
>>>> - with repeated aggregates in a route, I need dedicated threads/pools per
>>>> aggregate(), which means way more threads than I really want/need.
>>>> - regardless, I don't get the predictable and simple behaviour I expected
>>>> of 'pick up job from SEDA, aggregate, synchronously process downstream
>>>> jobs' that I'd expected.
>>>>
>>>> Another possible workaround might be the optimistic locking, but I haven't
>>>> had the opportunity to study it yet. It seems unrelated - I think my
>>>> problem is with the very coarse granularity of the pessimistic lock, not
>>>> with whether it's optimistic. Plus, I don't really want my messages to ever
>>>> fail with a 'too many attempts to acquire the optimistic lock' exception,
>>>> and I might have quite high contention).
>>>>
>>>> Many thanks in advance for your help/comments!
>>>>
>>>> Baris.
>>>
>>>
>>>
>>> --
>>> Claus Ibsen
>>> -----------------
>>> Red Hat, Inc.
>>> Email: cibsen@redhat.com
>>> Twitter: davsclaus
>>> Blog: http://davsclaus.com
>>> Author of Camel in Action: http://www.manning.com/ibsen
>
>
>
> --
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> Email: cibsen@redhat.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: Aggregator lock

Posted by Claus Ibsen <cl...@gmail.com>.
It works as designed.

If you want aggregator to use concurrent threads for downstream then
you need to configure that.

On Wed, Sep 18, 2013 at 5:15 PM, Baris Acar <ba...@acar.org.uk> wrote:
> Hi Claus,
>
> Thanks for your reply! I've tried using parallelProcessing and it comes with a few drawbacks as I've mentioned already. We're going with it as a workaround but I'm interested to know whether you consider the issue I've reported to be a bug.
>
> Do you believe that it is intentional/expected that by default the AggregateProcessor *holds a mutual exclusion lock* across all downstream processing, by default? It's really very unexpected to me, and the docs you link to make no mention of acquiring a lock over other code unrelated to the aggregation. As a user of a framework, one needs to know if a framework is going to acquire a mutual exclusion lock over my code, in order to reason about the parallelism.
>
> Importantly - are there any other processors which acquire a lock over all downstream processing?
>
> Barış
>
>
> On 18 Sep 2013, at 11:54, Claus Ibsen <cl...@gmail.com> wrote:
>
>> Hi
>>
>> See the parallelProcessing / executorService option on the aggregator
>> http://camel.apache.org/aggregator2
>>
>> On Wed, Sep 18, 2013 at 2:49 AM, Baris Acar <ba...@acar.org.uk> wrote:
>>> Hi,
>>>
>>> I'm seeing some surprising behaviour with my camel route, and was hoping
>>> someone in this group could help, as my trawl through the docs and Camel In
>>> Action book have not found the answers I'm looking for. Apologies if this
>>> question has been clearly answered elsewhere :-/
>>>
>>> I have a route that looks a little like the following:
>>>
>>>    from("seda:foo?concurrentConsumers=2")
>>>      .aggregate(header("myId"), myAggregationStrategy).completionSize(5)
>>>        .log("Sending out ${body} after a short pause...")
>>>        .delay(3000) // simulate a lengthy process
>>>        .log("Sending out ${body} imminently!")
>>>        .to(...) // other downstream processing
>>>
>>> Note that I'm using a SEDA with two *concurrent* consumers. I expected that
>>> once a SEDA consumer thread has picked up a message that completes an
>>> aggregation, that downstream processing will continue on that consumer
>>> thread, whilst other such downstream processing for another 'completed
>>> aggregation' message may be happening in parallel on the other SEDA
>>> consumer thread.
>>>
>>> What I'm finding instead is that whilst all of the work downstream of
>>> aggregate() does occur across the two consumer threads, it is serialised;
>>> no two threads execute the processors at the same time. This becomes quite
>>> noticeable if this downstream work is lengthy. I've uploaded a sample to
>>> https://github.com/bacar/aggregator-lock, which you can run with mvn test
>>> -Dtest=AggregateLock. It started from a sample from the CIA book.
>>>
>>> For example, you can see the whilst the second "Sending... after a short
>>> pause" does occur on a separate thread (#2), it does not start until after
>>> thread #1 has completed, despite the 3s delay():
>>>
>>> 2013-09-18 00:45:15,693 [el-1) thread #1 - Threads] INFO  route1 - Sending
>>> out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] after a short pause...
>>> 2013-09-18 00:45:18,695 [el-1) thread #1 - Threads] INFO  route1 - Sending
>>> out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] imminently!
>>> 2013-09-18 00:45:18,696 [el-1) thread #2 - Threads] INFO  route1 - Sending
>>> out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] after a short pause...
>>> 2013-09-18 00:45:21,698 [el-1) thread #2 - Threads] INFO  route1 - Sending
>>> out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] imminently!
>>>
>>> Is this behaviour expected? I found it _very_ surprising. Did I miss
>>> something in the docs that describes this behaviour? If the behaviour is
>>> expected, I am happy to try adding some info to the documentation if
>>> someone can explain the intent behind it.
>>>
>>> I'm not terribly familiar with the code, but I've had a dig around, and it
>>> looks like the reason for this behaviour is due to the following code
>>> inside the process() method of
>>> org.apache.camel.processor.aggregate.AggregateProcessor:
>>>
>>>            lock.lock();
>>>            try {
>>>                doAggregation(key, copy);
>>>            } finally {
>>>                lock.unlock();
>>>            }
>>>
>>> The doAggregation() method performs both the aggregation (i.e., adding the
>>> new exchange to the repository, checking if the completion criteria have
>>> been met etc) _and_, if complete, submits the aggregated message to the
>>> ExecutorService for downstream processing. However, since the default
>>> executorService is the SynchronousExecutorService, all downstream
>>> processing occurs synchronously with submission, and consequently, _within_
>>> the lock above.
>>>
>>> Whilst I can see obvious reasons that may make it necessary to perform the
>>> actual aggregation inside a lock, I do find it quite surprising that the
>>> downstream processing by default also occurs inside this lock. Are there
>>> any other processors known to behave in this way, i.e., by taking a lock
>>> around all downstream processing?
>>>
>>> I could potentially work around this issue by dispensing with the SEDA
>>> concurrentConsumers and using aggregate().parallelProcessing() instead,
>>> with a suitable executorService() specified, but this introduces a number
>>> of complications, e.g.:
>>> - if I repeatedly split() and re-aggregate() (by different criteria), then
>>> _every time_ I aggregate I have to add
>>> parallelProcessing()/executorService(); this is verbose and error prone.
>>> - with repeated aggregates in a route, I need dedicated threads/pools per
>>> aggregate(), which means way more threads than I really want/need.
>>> - regardless, I don't get the predictable and simple behaviour I expected
>>> of 'pick up job from SEDA, aggregate, synchronously process downstream
>>> jobs' that I'd expected.
>>>
>>> Another possible workaround might be the optimistic locking, but I haven't
>>> had the opportunity to study it yet. It seems unrelated - I think my
>>> problem is with the very coarse granularity of the pessimistic lock, not
>>> with whether it's optimistic. Plus, I don't really want my messages to ever
>>> fail with a 'too many attempts to acquire the optimistic lock' exception,
>>> and I might have quite high contention).
>>>
>>> Many thanks in advance for your help/comments!
>>>
>>> Baris.
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> Red Hat, Inc.
>> Email: cibsen@redhat.com
>> Twitter: davsclaus
>> Blog: http://davsclaus.com
>> Author of Camel in Action: http://www.manning.com/ibsen



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: Aggregator lock

Posted by Baris Acar <ba...@acar.org.uk>.
Hi Claus,

Thanks for your reply! I've tried using parallelProcessing and it comes with a few drawbacks as I've mentioned already. We're going with it as a workaround but I'm interested to know whether you consider the issue I've reported to be a bug.

Do you believe that it is intentional/expected that by default the AggregateProcessor *holds a mutual exclusion lock* across all downstream processing, by default? It's really very unexpected to me, and the docs you link to make no mention of acquiring a lock over other code unrelated to the aggregation. As a user of a framework, one needs to know if a framework is going to acquire a mutual exclusion lock over my code, in order to reason about the parallelism. 

Importantly - are there any other processors which acquire a lock over all downstream processing?

Barış


On 18 Sep 2013, at 11:54, Claus Ibsen <cl...@gmail.com> wrote:

> Hi
> 
> See the parallelProcessing / executorService option on the aggregator
> http://camel.apache.org/aggregator2
> 
> On Wed, Sep 18, 2013 at 2:49 AM, Baris Acar <ba...@acar.org.uk> wrote:
>> Hi,
>> 
>> I'm seeing some surprising behaviour with my camel route, and was hoping
>> someone in this group could help, as my trawl through the docs and Camel In
>> Action book have not found the answers I'm looking for. Apologies if this
>> question has been clearly answered elsewhere :-/
>> 
>> I have a route that looks a little like the following:
>> 
>>    from("seda:foo?concurrentConsumers=2")
>>      .aggregate(header("myId"), myAggregationStrategy).completionSize(5)
>>        .log("Sending out ${body} after a short pause...")
>>        .delay(3000) // simulate a lengthy process
>>        .log("Sending out ${body} imminently!")
>>        .to(...) // other downstream processing
>> 
>> Note that I'm using a SEDA with two *concurrent* consumers. I expected that
>> once a SEDA consumer thread has picked up a message that completes an
>> aggregation, that downstream processing will continue on that consumer
>> thread, whilst other such downstream processing for another 'completed
>> aggregation' message may be happening in parallel on the other SEDA
>> consumer thread.
>> 
>> What I'm finding instead is that whilst all of the work downstream of
>> aggregate() does occur across the two consumer threads, it is serialised;
>> no two threads execute the processors at the same time. This becomes quite
>> noticeable if this downstream work is lengthy. I've uploaded a sample to
>> https://github.com/bacar/aggregator-lock, which you can run with mvn test
>> -Dtest=AggregateLock. It started from a sample from the CIA book.
>> 
>> For example, you can see the whilst the second "Sending... after a short
>> pause" does occur on a separate thread (#2), it does not start until after
>> thread #1 has completed, despite the 3s delay():
>> 
>> 2013-09-18 00:45:15,693 [el-1) thread #1 - Threads] INFO  route1 - Sending
>> out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] after a short pause...
>> 2013-09-18 00:45:18,695 [el-1) thread #1 - Threads] INFO  route1 - Sending
>> out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] imminently!
>> 2013-09-18 00:45:18,696 [el-1) thread #2 - Threads] INFO  route1 - Sending
>> out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] after a short pause...
>> 2013-09-18 00:45:21,698 [el-1) thread #2 - Threads] INFO  route1 - Sending
>> out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] imminently!
>> 
>> Is this behaviour expected? I found it _very_ surprising. Did I miss
>> something in the docs that describes this behaviour? If the behaviour is
>> expected, I am happy to try adding some info to the documentation if
>> someone can explain the intent behind it.
>> 
>> I'm not terribly familiar with the code, but I've had a dig around, and it
>> looks like the reason for this behaviour is due to the following code
>> inside the process() method of
>> org.apache.camel.processor.aggregate.AggregateProcessor:
>> 
>>            lock.lock();
>>            try {
>>                doAggregation(key, copy);
>>            } finally {
>>                lock.unlock();
>>            }
>> 
>> The doAggregation() method performs both the aggregation (i.e., adding the
>> new exchange to the repository, checking if the completion criteria have
>> been met etc) _and_, if complete, submits the aggregated message to the
>> ExecutorService for downstream processing. However, since the default
>> executorService is the SynchronousExecutorService, all downstream
>> processing occurs synchronously with submission, and consequently, _within_
>> the lock above.
>> 
>> Whilst I can see obvious reasons that may make it necessary to perform the
>> actual aggregation inside a lock, I do find it quite surprising that the
>> downstream processing by default also occurs inside this lock. Are there
>> any other processors known to behave in this way, i.e., by taking a lock
>> around all downstream processing?
>> 
>> I could potentially work around this issue by dispensing with the SEDA
>> concurrentConsumers and using aggregate().parallelProcessing() instead,
>> with a suitable executorService() specified, but this introduces a number
>> of complications, e.g.:
>> - if I repeatedly split() and re-aggregate() (by different criteria), then
>> _every time_ I aggregate I have to add
>> parallelProcessing()/executorService(); this is verbose and error prone.
>> - with repeated aggregates in a route, I need dedicated threads/pools per
>> aggregate(), which means way more threads than I really want/need.
>> - regardless, I don't get the predictable and simple behaviour I expected
>> of 'pick up job from SEDA, aggregate, synchronously process downstream
>> jobs' that I'd expected.
>> 
>> Another possible workaround might be the optimistic locking, but I haven't
>> had the opportunity to study it yet. It seems unrelated - I think my
>> problem is with the very coarse granularity of the pessimistic lock, not
>> with whether it's optimistic. Plus, I don't really want my messages to ever
>> fail with a 'too many attempts to acquire the optimistic lock' exception,
>> and I might have quite high contention).
>> 
>> Many thanks in advance for your help/comments!
>> 
>> Baris.
> 
> 
> 
> -- 
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> Email: cibsen@redhat.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen

Re: Aggregator lock

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

See the parallelProcessing / executorService option on the aggregator
http://camel.apache.org/aggregator2

On Wed, Sep 18, 2013 at 2:49 AM, Baris Acar <ba...@acar.org.uk> wrote:
> Hi,
>
> I'm seeing some surprising behaviour with my camel route, and was hoping
> someone in this group could help, as my trawl through the docs and Camel In
> Action book have not found the answers I'm looking for. Apologies if this
> question has been clearly answered elsewhere :-/
>
> I have a route that looks a little like the following:
>
>     from("seda:foo?concurrentConsumers=2")
>       .aggregate(header("myId"), myAggregationStrategy).completionSize(5)
>         .log("Sending out ${body} after a short pause...")
>         .delay(3000) // simulate a lengthy process
>         .log("Sending out ${body} imminently!")
>         .to(...) // other downstream processing
>
> Note that I'm using a SEDA with two *concurrent* consumers. I expected that
> once a SEDA consumer thread has picked up a message that completes an
> aggregation, that downstream processing will continue on that consumer
> thread, whilst other such downstream processing for another 'completed
> aggregation' message may be happening in parallel on the other SEDA
> consumer thread.
>
> What I'm finding instead is that whilst all of the work downstream of
> aggregate() does occur across the two consumer threads, it is serialised;
> no two threads execute the processors at the same time. This becomes quite
> noticeable if this downstream work is lengthy. I've uploaded a sample to
> https://github.com/bacar/aggregator-lock, which you can run with mvn test
> -Dtest=AggregateLock. It started from a sample from the CIA book.
>
> For example, you can see the whilst the second "Sending... after a short
> pause" does occur on a separate thread (#2), it does not start until after
> thread #1 has completed, despite the 3s delay():
>
> 2013-09-18 00:45:15,693 [el-1) thread #1 - Threads] INFO  route1 - Sending
> out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] after a short pause...
> 2013-09-18 00:45:18,695 [el-1) thread #1 - Threads] INFO  route1 - Sending
> out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] imminently!
> 2013-09-18 00:45:18,696 [el-1) thread #2 - Threads] INFO  route1 - Sending
> out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] after a short pause...
> 2013-09-18 00:45:21,698 [el-1) thread #2 - Threads] INFO  route1 - Sending
> out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] imminently!
>
> Is this behaviour expected? I found it _very_ surprising. Did I miss
> something in the docs that describes this behaviour? If the behaviour is
> expected, I am happy to try adding some info to the documentation if
> someone can explain the intent behind it.
>
> I'm not terribly familiar with the code, but I've had a dig around, and it
> looks like the reason for this behaviour is due to the following code
> inside the process() method of
> org.apache.camel.processor.aggregate.AggregateProcessor:
>
>             lock.lock();
>             try {
>                 doAggregation(key, copy);
>             } finally {
>                 lock.unlock();
>             }
>
> The doAggregation() method performs both the aggregation (i.e., adding the
> new exchange to the repository, checking if the completion criteria have
> been met etc) _and_, if complete, submits the aggregated message to the
> ExecutorService for downstream processing. However, since the default
> executorService is the SynchronousExecutorService, all downstream
> processing occurs synchronously with submission, and consequently, _within_
> the lock above.
>
> Whilst I can see obvious reasons that may make it necessary to perform the
> actual aggregation inside a lock, I do find it quite surprising that the
> downstream processing by default also occurs inside this lock. Are there
> any other processors known to behave in this way, i.e., by taking a lock
> around all downstream processing?
>
> I could potentially work around this issue by dispensing with the SEDA
> concurrentConsumers and using aggregate().parallelProcessing() instead,
> with a suitable executorService() specified, but this introduces a number
> of complications, e.g.:
> - if I repeatedly split() and re-aggregate() (by different criteria), then
> _every time_ I aggregate I have to add
> parallelProcessing()/executorService(); this is verbose and error prone.
> - with repeated aggregates in a route, I need dedicated threads/pools per
> aggregate(), which means way more threads than I really want/need.
> - regardless, I don't get the predictable and simple behaviour I expected
> of 'pick up job from SEDA, aggregate, synchronously process downstream
> jobs' that I'd expected.
>
> Another possible workaround might be the optimistic locking, but I haven't
> had the opportunity to study it yet. It seems unrelated - I think my
> problem is with the very coarse granularity of the pessimistic lock, not
> with whether it's optimistic. Plus, I don't really want my messages to ever
> fail with a 'too many attempts to acquire the optimistic lock' exception,
> and I might have quite high contention).
>
> Many thanks in advance for your help/comments!
>
> Baris.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen