You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by khein <kh...@vsciences.com> on 2010/11/29 23:41:55 UTC

concurrentConsumers issue

I'm seeing an issue with concurrent consumption of JMS messages.  I've
written a simple test which demonstrates what is occurring.  It seems that
threads are NOT being assigned to waiting consumers as they become free,
they are being assigned based on the order they were originally called in.

Simple example (with concurrentConsumers=3, 5 concurrent requests are send
initially):  

1st consumer/thread starts and sleeps for 10 seconds, completes.
2nd consumer/thread starts and sleeps for 5 seconds, completes.
3rd consumer/thread starts and sleeps for 3 seconds, completes.
4th REQUEST waits for the 1st consumer/thread to complete before being
processed
5th REQUEST waits for the 2nd consumer/thread to complete

I'm using Camel 2.5, ActiveMQ 5.4.1, Spring 3.0.4.  Following is a log
showing the behavior, followed by my Camel configuration.

This seems pretty basic, so I'm sure I'm just missing something.

--- --- ---

08:45:03,864 | INFO  | ad-1 | uniworks.core.ThreadTester                                      
| ThreadTester:  Sending Requests
08:45:03,904 | INFO  | er-8 | uniworks.core.ThreadTestServiceImpl                             
| Starting Thread:  duration of pause = 15000
08:45:03,919 | INFO  | er-7 | uniworks.core.ThreadTestServiceImpl                             
| Starting Thread:  duration of pause = 12000
08:45:03,949 | INFO  | er-3 | uniworks.core.ThreadTestServiceImpl                             
| Starting Thread:  duration of pause = 11000
08:45:03,968 | INFO  | er-2 | uniworks.core.ThreadTestServiceImpl                             
| Starting Thread:  duration of pause = 10000
08:45:03,990 | INFO  | er-4 | uniworks.core.ThreadTestServiceImpl                             
| Starting Thread:  duration of pause = 9000
08:45:04,017 | INFO  | r-10 | uniworks.core.ThreadTestServiceImpl                             
| Starting Thread:  duration of pause = 8000
08:45:04,038 | INFO  | er-1 | uniworks.core.ThreadTestServiceImpl                             
| Starting Thread:  duration of pause = 7000
08:45:04,056 | INFO  | er-5 | uniworks.core.ThreadTestServiceImpl                             
| Starting Thread:  duration of pause = 6000
08:45:04,084 | INFO  | er-6 | uniworks.core.ThreadTestServiceImpl                             
| Starting Thread:  duration of pause = 5000
08:45:04,106 | INFO  | er-9 | uniworks.core.ThreadTestServiceImpl                             
| Starting Thread:  duration of pause = 4000
08:45:04,160 | INFO  | ad-1 | uniworks.core.ThreadTester                                      
| ThreadTester:  All Requests Sent
08:45:08,107 | INFO  | er-9 | uniworks.core.ThreadTestServiceImpl                             
| Ending Thread:    duration of pause = 4000
08:45:09,085 | INFO  | er-6 | uniworks.core.ThreadTestServiceImpl                             
| Ending Thread:    duration of pause = 5000
08:45:10,056 | INFO  | er-5 | uniworks.core.ThreadTestServiceImpl                             
| Ending Thread:    duration of pause = 6000
08:45:11,038 | INFO  | er-1 | uniworks.core.ThreadTestServiceImpl                             
| Ending Thread:    duration of pause = 7000
08:45:12,017 | INFO  | r-10 | uniworks.core.ThreadTestServiceImpl                             
| Ending Thread:    duration of pause = 8000
08:45:12,990 | INFO  | er-4 | uniworks.core.ThreadTestServiceImpl                             
| Ending Thread:    duration of pause = 9000
08:45:13,968 | INFO  | er-2 | uniworks.core.ThreadTestServiceImpl                             
| Ending Thread:    duration of pause = 10000
08:45:14,949 | INFO  | er-3 | uniworks.core.ThreadTestServiceImpl                             
| Ending Thread:    duration of pause = 11000
08:45:14,954 | INFO  | er-3 | uniworks.core.ThreadTestServiceImpl                             
| Starting Thread:  duration of pause = 1000
08:45:15,919 | INFO  | er-7 | uniworks.core.ThreadTestServiceImpl                             
| Ending Thread:    duration of pause = 12000
08:45:15,924 | INFO  | er-7 | uniworks.core.ThreadTestServiceImpl                             
| Starting Thread:  duration of pause = 2000
08:45:15,954 | INFO  | er-3 | uniworks.core.ThreadTestServiceImpl                             
| Ending Thread:    duration of pause = 1000
08:45:17,924 | INFO  | er-7 | uniworks.core.ThreadTestServiceImpl                             
| Ending Thread:    duration of pause = 2000
08:45:18,904 | INFO  | er-8 | uniworks.core.ThreadTestServiceImpl                             
| Ending Thread:    duration of pause = 15000
08:45:18,908 | INFO  | er-8 | uniworks.core.ThreadTestServiceImpl                             
| Starting Thread:  duration of pause = 3000
08:45:21,909 | INFO  | er-8 | uniworks.core.ThreadTestServiceImpl                             
| Ending Thread:    duration of pause = 3000

--- --- ---

    <camel:route>
      <camel:from
uri="jms:threadTestService?maxConcurrentConsumers=10&amp;concurrentConsumers=8"/>
      <camel:to uri="bean:threadTestServiceImpl"/>
    </camel:route>

  <bean id="jmsConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="${jms.url}"/>
  </bean>

  <bean id="pooledConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="maxConnections" value="25"/>
    <property name="maximumActive" value="500"/>
    <property name="connectionFactory" ref="jmsConnectionFactory"/>
  </bean>


  <bean id="jmsConfig"
class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="pooledConnectionFactory"/>
    <property name="transacted" value="false"/>
    <property name="concurrentConsumers" value="8"/>
  </bean>

  <bean id="jms"
class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsConfig"/>
  </bean>
-- 
View this message in context: http://camel.465427.n5.nabble.com/concurrentConsumers-issue-tp3285351p3285351.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: concurrentConsumers issue

Posted by khein <kh...@vsciences.com>.
I've reproduced this issue with just the Spring
DefaultMessageListenerContainer.  Will work the issue through the Spring
guys.  Thanks for the quick responses.
-- 
View this message in context: http://camel.465427.n5.nabble.com/concurrentConsumers-issue-tp3285351p3287161.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: concurrentConsumers issue

Posted by khein <kh...@vsciences.com>.
I've used the Spring JMS listeners many times and never encountered this. 
I'll convert my test to using Spring JMS to verify.

This is major issue  because of the unreliability it causes.  Basically a
long running thread (or one that transactionally binds, hangs, etc) delays
processing of other downstream messages.  In a 4 consumer pool, if the 1st
message receives causes long processing, it delays the 5th, 9th, 13th
message, while the others process in a more timely manner.
-- 
View this message in context: http://camel.465427.n5.nabble.com/concurrentConsumers-issue-tp3285351p3286723.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: concurrentConsumers issue

Posted by Claus Ibsen <cl...@gmail.com>.
camel-jms uses Spring JMS under the covers. So the concurrent
consumers works the way it does in Spring JMS.


On Mon, Nov 29, 2010 at 11:41 PM, khein <kh...@vsciences.com> wrote:
>
> I'm seeing an issue with concurrent consumption of JMS messages.  I've
> written a simple test which demonstrates what is occurring.  It seems that
> threads are NOT being assigned to waiting consumers as they become free,
> they are being assigned based on the order they were originally called in.
>
> Simple example (with concurrentConsumers=3, 5 concurrent requests are send
> initially):
>
> 1st consumer/thread starts and sleeps for 10 seconds, completes.
> 2nd consumer/thread starts and sleeps for 5 seconds, completes.
> 3rd consumer/thread starts and sleeps for 3 seconds, completes.
> 4th REQUEST waits for the 1st consumer/thread to complete before being
> processed
> 5th REQUEST waits for the 2nd consumer/thread to complete
>
> I'm using Camel 2.5, ActiveMQ 5.4.1, Spring 3.0.4.  Following is a log
> showing the behavior, followed by my Camel configuration.
>
> This seems pretty basic, so I'm sure I'm just missing something.
>
> --- --- ---
>
> 08:45:03,864 | INFO  | ad-1 | uniworks.core.ThreadTester
> | ThreadTester:  Sending Requests
> 08:45:03,904 | INFO  | er-8 | uniworks.core.ThreadTestServiceImpl
> | Starting Thread:  duration of pause = 15000
> 08:45:03,919 | INFO  | er-7 | uniworks.core.ThreadTestServiceImpl
> | Starting Thread:  duration of pause = 12000
> 08:45:03,949 | INFO  | er-3 | uniworks.core.ThreadTestServiceImpl
> | Starting Thread:  duration of pause = 11000
> 08:45:03,968 | INFO  | er-2 | uniworks.core.ThreadTestServiceImpl
> | Starting Thread:  duration of pause = 10000
> 08:45:03,990 | INFO  | er-4 | uniworks.core.ThreadTestServiceImpl
> | Starting Thread:  duration of pause = 9000
> 08:45:04,017 | INFO  | r-10 | uniworks.core.ThreadTestServiceImpl
> | Starting Thread:  duration of pause = 8000
> 08:45:04,038 | INFO  | er-1 | uniworks.core.ThreadTestServiceImpl
> | Starting Thread:  duration of pause = 7000
> 08:45:04,056 | INFO  | er-5 | uniworks.core.ThreadTestServiceImpl
> | Starting Thread:  duration of pause = 6000
> 08:45:04,084 | INFO  | er-6 | uniworks.core.ThreadTestServiceImpl
> | Starting Thread:  duration of pause = 5000
> 08:45:04,106 | INFO  | er-9 | uniworks.core.ThreadTestServiceImpl
> | Starting Thread:  duration of pause = 4000
> 08:45:04,160 | INFO  | ad-1 | uniworks.core.ThreadTester
> | ThreadTester:  All Requests Sent
> 08:45:08,107 | INFO  | er-9 | uniworks.core.ThreadTestServiceImpl
> | Ending Thread:    duration of pause = 4000
> 08:45:09,085 | INFO  | er-6 | uniworks.core.ThreadTestServiceImpl
> | Ending Thread:    duration of pause = 5000
> 08:45:10,056 | INFO  | er-5 | uniworks.core.ThreadTestServiceImpl
> | Ending Thread:    duration of pause = 6000
> 08:45:11,038 | INFO  | er-1 | uniworks.core.ThreadTestServiceImpl
> | Ending Thread:    duration of pause = 7000
> 08:45:12,017 | INFO  | r-10 | uniworks.core.ThreadTestServiceImpl
> | Ending Thread:    duration of pause = 8000
> 08:45:12,990 | INFO  | er-4 | uniworks.core.ThreadTestServiceImpl
> | Ending Thread:    duration of pause = 9000
> 08:45:13,968 | INFO  | er-2 | uniworks.core.ThreadTestServiceImpl
> | Ending Thread:    duration of pause = 10000
> 08:45:14,949 | INFO  | er-3 | uniworks.core.ThreadTestServiceImpl
> | Ending Thread:    duration of pause = 11000
> 08:45:14,954 | INFO  | er-3 | uniworks.core.ThreadTestServiceImpl
> | Starting Thread:  duration of pause = 1000
> 08:45:15,919 | INFO  | er-7 | uniworks.core.ThreadTestServiceImpl
> | Ending Thread:    duration of pause = 12000
> 08:45:15,924 | INFO  | er-7 | uniworks.core.ThreadTestServiceImpl
> | Starting Thread:  duration of pause = 2000
> 08:45:15,954 | INFO  | er-3 | uniworks.core.ThreadTestServiceImpl
> | Ending Thread:    duration of pause = 1000
> 08:45:17,924 | INFO  | er-7 | uniworks.core.ThreadTestServiceImpl
> | Ending Thread:    duration of pause = 2000
> 08:45:18,904 | INFO  | er-8 | uniworks.core.ThreadTestServiceImpl
> | Ending Thread:    duration of pause = 15000
> 08:45:18,908 | INFO  | er-8 | uniworks.core.ThreadTestServiceImpl
> | Starting Thread:  duration of pause = 3000
> 08:45:21,909 | INFO  | er-8 | uniworks.core.ThreadTestServiceImpl
> | Ending Thread:    duration of pause = 3000
>
> --- --- ---
>
>    <camel:route>
>      <camel:from
> uri="jms:threadTestService?maxConcurrentConsumers=10&amp;concurrentConsumers=8"/>
>      <camel:to uri="bean:threadTestServiceImpl"/>
>    </camel:route>
>
>  <bean id="jmsConnectionFactory"
> class="org.apache.activemq.ActiveMQConnectionFactory">
>    <property name="brokerURL" value="${jms.url}"/>
>  </bean>
>
>  <bean id="pooledConnectionFactory"
> class="org.apache.activemq.pool.PooledConnectionFactory">
>    <property name="maxConnections" value="25"/>
>    <property name="maximumActive" value="500"/>
>    <property name="connectionFactory" ref="jmsConnectionFactory"/>
>  </bean>
>
>
>  <bean id="jmsConfig"
> class="org.apache.camel.component.jms.JmsConfiguration">
>    <property name="connectionFactory" ref="pooledConnectionFactory"/>
>    <property name="transacted" value="false"/>
>    <property name="concurrentConsumers" value="8"/>
>  </bean>
>
>  <bean id="jms"
> class="org.apache.activemq.camel.component.ActiveMQComponent">
>    <property name="configuration" ref="jmsConfig"/>
>  </bean>
> --
> View this message in context: http://camel.465427.n5.nabble.com/concurrentConsumers-issue-tp3285351p3285351.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/