You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by wantastic <Wa...@barclays.com> on 2012/07/25 00:40:46 UTC

Question regarding SEDA & threadpool

Hello all,

I am a beginner of Camel and trying to figure out how to create route
properly.
What I'm trying to do is pretty simple. I have a JMS queue listener which
takes message one by one. Then this message will be logged by WireTap and
gets thrown into a threadpool(a pool of worker threads) to be processed. My
first approach was like this:
<camelContext -->
<route>
	<threadPool id="myThreadPool" threadName="MyThread" poolSize="10"
maxPoolSize="20" maxQueueSize="1000" />
	<threadPool id="dataLoggerPool" threadName="DataLogger" poolSize="1"
maxPoolSize="1" maxQueueSize="1000" />
	
	<from uri="pns-jms:queue:blahblah" />
	<wireTap uri="dataLogger" executorServiceRef="dataLoggerPool"/>
	<threads executorServiceRef="myThreadPool">
		<to uri="myValidationProcess" />
		<to uri="myConvertProcess" />
	</threads>
</camelContext>

However, this turned out to be a wrong implementation since I realized that
messages are not being distributed to multiple threads concurrently. For
example, if I'm doing "Thread.currentThread.sleep(3000)" inside
myConvertProcess, I see threads are being invoked one at a time, not 10
threads at the same time. In order to solve this problem, I had to use SEDA
to send message to the threadpool asynchronously. So my second
implementation came out like this:
	
	<route>
		<from uri="pns-jms:queue:blahblah" />
		<to uri="seda:processMsg?size=1000;blockWhenFull=true" />
    </route>
	<route>
		<from uri="seda:processMsg" />
		<threads executorServiceRef="myThreadPool">
			<to uri="myValidationProcess" />
			<to uri="myConvertProcess" />
		</threads>
	</route>

This seems to be working, however, I get two blocking queues, SEDA and
threadpool, which is not so desirable. Therefore, I decided to go with
SEDA's concurrentConsumers.

	<route>
		<from uri="pns-jms:queue:blahblah" />
		<to uri="seda:processMsg?size=1000;blockWhenFull=true" />
    </route>
	<route>
		<from uri="seda:processMsg?concurrentConsumers=10" />
		<to uri="myValidationProcess" />
		<to uri="myConvertProcess" />
	</route>

This seems to be working okay but I came to another question: What if the
seda queue is full? Will SEDA block from queue to get message when its queue
is full?
Does anyone have a better suggestion for implementation? It is a simple
model: get message, throw it into a threadpool to be picked up by a worker
thread to process it.

If anyone can give feedback on this, I will be very grateful.

Thank you,
Wan




--
View this message in context: http://camel.465427.n5.nabble.com/Question-regarding-SEDA-threadpool-tp5716431.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Question regarding SEDA & threadpool

Posted by Claus Ibsen <cl...@gmail.com>.
On Wed, Jul 25, 2012 at 12:40 AM, wantastic <Wa...@barclays.com> wrote:
> Hello all,
>
> I am a beginner of Camel and trying to figure out how to create route
> properly.
> What I'm trying to do is pretty simple. I have a JMS queue listener which
> takes message one by one. Then this message will be logged by WireTap and
> gets thrown into a threadpool(a pool of worker threads) to be processed. My
> first approach was like this:
> <camelContext -->
> <route>
>         <threadPool id="myThreadPool" threadName="MyThread" poolSize="10"
> maxPoolSize="20" maxQueueSize="1000" />
>         <threadPool id="dataLoggerPool" threadName="DataLogger" poolSize="1"
> maxPoolSize="1" maxQueueSize="1000" />
>
>         <from uri="pns-jms:queue:blahblah" />
>         <wireTap uri="dataLogger" executorServiceRef="dataLoggerPool"/>
>         <threads executorServiceRef="myThreadPool">
>                 <to uri="myValidationProcess" />
>                 <to uri="myConvertProcess" />
>         </threads>
> </camelContext>
>
> However, this turned out to be a wrong implementation since I realized that
> messages are not being distributed to multiple threads concurrently. For
> example, if I'm doing "Thread.currentThread.sleep(3000)" inside
> myConvertProcess, I see threads are being invoked one at a time, not 10
> threads at the same time. In order to solve this problem, I had to use SEDA
> to send message to the threadpool asynchronously. So my second
> implementation came out like this:
>

The JMS consumer in the <from> is synchronized by default. eg it waits
until the message is done processed before it consume the next message
from the queue. This ensures the messages is processed sequentielly.

You can use the asynConsumer=true option, to allow it to consume the
next message eagerly.




>         <route>
>                 <from uri="pns-jms:queue:blahblah" />
>                 <to uri="seda:processMsg?size=1000;blockWhenFull=true" />
>     </route>
>         <route>
>                 <from uri="seda:processMsg" />
>                 <threads executorServiceRef="myThreadPool">
>                         <to uri="myValidationProcess" />
>                         <to uri="myConvertProcess" />
>                 </threads>
>         </route>
>
> This seems to be working, however, I get two blocking queues, SEDA and
> threadpool, which is not so desirable. Therefore, I decided to go with
> SEDA's concurrentConsumers.
>
>         <route>
>                 <from uri="pns-jms:queue:blahblah" />
>                 <to uri="seda:processMsg?size=1000;blockWhenFull=true" />
>     </route>
>         <route>
>                 <from uri="seda:processMsg?concurrentConsumers=10" />
>                 <to uri="myValidationProcess" />
>                 <to uri="myConvertProcess" />
>         </route>
>
> This seems to be working okay but I came to another question: What if the
> seda queue is full? Will SEDA block from queue to get message when its queue
> is full?
> Does anyone have a better suggestion for implementation? It is a simple
> model: get message, throw it into a threadpool to be picked up by a worker
> thread to process it.
>
> If anyone can give feedback on this, I will be very grateful.
>
> Thank you,
> Wan
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Question-regarding-SEDA-threadpool-tp5716431.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: Question regarding SEDA & threadpool

Posted by wantastic <Wa...@barclays.com>.
Thank you Willem,

I have looked into the throttler and it is very useful. I will investigate
further to see how I can make use of it.

Thank you,
Wan



--
View this message in context: http://camel.465427.n5.nabble.com/Question-regarding-SEDA-threadpool-tp5716431p5716534.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Question regarding SEDA & threadpool

Posted by Willem Jiang <wi...@gmail.com>.
On Wed, Jul 25, 2012 at 6:40 AM, wantastic <Wa...@barclays.com> wrote:
> Hello all,
>
> I am a beginner of Camel and trying to figure out how to create route
> properly.
> What I'm trying to do is pretty simple. I have a JMS queue listener which
> takes message one by one. Then this message will be logged by WireTap and
> gets thrown into a threadpool(a pool of worker threads) to be processed. My
> first approach was like this:
> <camelContext -->
> <route>
>         <threadPool id="myThreadPool" threadName="MyThread" poolSize="10"
> maxPoolSize="20" maxQueueSize="1000" />
>         <threadPool id="dataLoggerPool" threadName="DataLogger" poolSize="1"
> maxPoolSize="1" maxQueueSize="1000" />
>
>         <from uri="pns-jms:queue:blahblah" />
>         <wireTap uri="dataLogger" executorServiceRef="dataLoggerPool"/>
>         <threads executorServiceRef="myThreadPool">
>                 <to uri="myValidationProcess" />
>                 <to uri="myConvertProcess" />
>         </threads>
> </camelContext>
>
> However, this turned out to be a wrong implementation since I realized that
> messages are not being distributed to multiple threads concurrently. For
> example, if I'm doing "Thread.currentThread.sleep(3000)" inside
> myConvertProcess, I see threads are being invoked one at a time, not 10
> threads at the same time. In order to solve this problem, I had to use SEDA
> to send message to the threadpool asynchronously. So my second
> implementation came out like this:
>
>         <route>
>                 <from uri="pns-jms:queue:blahblah" />
>                 <to uri="seda:processMsg?size=1000;blockWhenFull=true" />
>     </route>
>         <route>
>                 <from uri="seda:processMsg" />
>                 <threads executorServiceRef="myThreadPool">
>                         <to uri="myValidationProcess" />
>                         <to uri="myConvertProcess" />
>                 </threads>
>         </route>
>
> This seems to be working, however, I get two blocking queues, SEDA and
> threadpool, which is not so desirable. Therefore, I decided to go with
> SEDA's concurrentConsumers.
>
>         <route>
>                 <from uri="pns-jms:queue:blahblah" />
>                 <to uri="seda:processMsg?size=1000;blockWhenFull=true" />
>     </route>
>         <route>
>                 <from uri="seda:processMsg?concurrentConsumers=10" />
>                 <to uri="myValidationProcess" />
>                 <to uri="myConvertProcess" />
>         </route>
>
> This seems to be working okay but I came to another question: What if the
> seda queue is full? Will SEDA block from queue to get message when its queue
> is full?

It's a typical issue of the Queue, you can use the throttler to do the
flow control work.

[1]http://camel.apache.org/throttler.html

> Does anyone have a better suggestion for implementation? It is a simple
> model: get message, throw it into a threadpool to be picked up by a worker
> thread to process it.

Yes, that is the simpler implementation of SEDA.

>
> If anyone can give feedback on this, I will be very grateful.
>
> Thank you,
> Wan
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Question-regarding-SEDA-threadpool-tp5716431.html
> Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Question regarding SEDA & threadpool

Posted by wantastic <Wa...@barclays.com>.
Christian,
Thank you for your feedback. Unfortunately, I am not able to use
concurrentConsumer because the queue is exclusive and only allow one
consumer. Do you see any other way to use the model that I explained above,
which has one consumer distributing messages to other worker threads to
process messages.

Cheers,
Wan



--
View this message in context: http://camel.465427.n5.nabble.com/Question-regarding-SEDA-threadpool-tp5716431p5716481.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Question regarding SEDA & threadpool

Posted by Christian Müller <ch...@gmail.com>.
Hello Wan!

Use the concurrentConsumers option [1].
<camelContext>
  <route>
    <from uri="pns-jms:queue:blahblah?concurrentConsumers=10" />
    <wireTap uri="dataLogger" />
    <to uri="myValidationProcess" />
    <to uri="myConvertProcess" />
  <route>
</camelContext>

[1] http://camel.apache.org/jms.html

Best,
Christian

On Wed, Jul 25, 2012 at 12:40 AM, wantastic <Wa...@barclays.com> wrote:

> Hello all,
>
> I am a beginner of Camel and trying to figure out how to create route
> properly.
> What I'm trying to do is pretty simple. I have a JMS queue listener which
> takes message one by one. Then this message will be logged by WireTap and
> gets thrown into a threadpool(a pool of worker threads) to be processed. My
> first approach was like this:
> <camelContext -->
> <route>
>         <threadPool id="myThreadPool" threadName="MyThread" poolSize="10"
> maxPoolSize="20" maxQueueSize="1000" />
>         <threadPool id="dataLoggerPool" threadName="DataLogger"
> poolSize="1"
> maxPoolSize="1" maxQueueSize="1000" />
>
>         <from uri="pns-jms:queue:blahblah" />
>         <wireTap uri="dataLogger" executorServiceRef="dataLoggerPool"/>
>         <threads executorServiceRef="myThreadPool">
>                 <to uri="myValidationProcess" />
>                 <to uri="myConvertProcess" />
>         </threads>
> </camelContext>
>
> However, this turned out to be a wrong implementation since I realized that
> messages are not being distributed to multiple threads concurrently. For
> example, if I'm doing "Thread.currentThread.sleep(3000)" inside
> myConvertProcess, I see threads are being invoked one at a time, not 10
> threads at the same time. In order to solve this problem, I had to use SEDA
> to send message to the threadpool asynchronously. So my second
> implementation came out like this:
>
>         <route>
>                 <from uri="pns-jms:queue:blahblah" />
>                 <to uri="seda:processMsg?size=1000;blockWhenFull=true" />
>     </route>
>         <route>
>                 <from uri="seda:processMsg" />
>                 <threads executorServiceRef="myThreadPool">
>                         <to uri="myValidationProcess" />
>                         <to uri="myConvertProcess" />
>                 </threads>
>         </route>
>
> This seems to be working, however, I get two blocking queues, SEDA and
> threadpool, which is not so desirable. Therefore, I decided to go with
> SEDA's concurrentConsumers.
>
>         <route>
>                 <from uri="pns-jms:queue:blahblah" />
>                 <to uri="seda:processMsg?size=1000;blockWhenFull=true" />
>     </route>
>         <route>
>                 <from uri="seda:processMsg?concurrentConsumers=10" />
>                 <to uri="myValidationProcess" />
>                 <to uri="myConvertProcess" />
>         </route>
>
> This seems to be working okay but I came to another question: What if the
> seda queue is full? Will SEDA block from queue to get message when its
> queue
> is full?
> Does anyone have a better suggestion for implementation? It is a simple
> model: get message, throw it into a threadpool to be picked up by a worker
> thread to process it.
>
> If anyone can give feedback on this, I will be very grateful.
>
> Thank you,
> Wan
>
>
>
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Question-regarding-SEDA-threadpool-tp5716431.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>