You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Edwin <ed...@gmail.com> on 2012/06/08 20:48:14 UTC

Camel Component - Consumer Threading Question

Hi Folks, 

I am currently developing a Camel component comprising of both consumer and
producer endpoints. The consumer will ping a Socket connection. When a
connection has been established, a session is then created

In the system there will be 100+ sessions. I plan to build the Camel route
dynamically so there will be a route consuming from each session

So for N sessions, there will be N routes and each session and route runs on
it's own thread

Applying threading on the route does not present any issues however this is
proving more challenging on the consumer. 

I have been looking at the ScheduledPollConsumer and I could extend this
class and override it's run() and make one or two other changes so it fits
my use case but it's not the cleanest of solutions

I'm wondering if Camel provides anything out of the box that ensures a
consumer is created in it's own thread? 

Any thoughts on this topic are much appreciated,

Thanks,
Edwin



In the component, I would like the consumer to consume from each of these
individual threads however the consumer should have it's own thread to read
from an individual session thread. So the mapping between session and
consumer is: N sessions : N consumers. create a route that For the consumer,
i'm wondering is it possible to create a thread for every consumer. The
application of this component 

--
View this message in context: http://camel.465427.n5.nabble.com/Camel-Component-Consumer-Threading-Question-tp5714227.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel Component - Consumer Threading Question

Posted by Claus Ibsen <cl...@gmail.com>.
On Fri, Jun 15, 2012 at 6:28 PM, gilboy <jo...@gmail.com> wrote:

> Thanks for the response.
>
> I was just thinking about this a little further. If an exception occurs in
> my consumer, I really just need to restart the consumer not the whole
> route.
>
> If I take this approach, i.e.
>
> class CustomConsumer extends DefaultConsumer implements Runnable{
> ......
>
> public void run(){
>
>   try{
>
>   } catch (Exception exc){
>       doStop();
>      doStart();
>   }
>
> .....
> }
>
> By only restarting the consumer I will not need to worry about inflight
> exchanges, right?
>
>
No


> Is it bad practise to restart a consumer in isolation?
>
>
Depends on the consumer itself, as essentially if there is an inflight
exchange in progress that originated from this consumer, then when that
inflight message is complete it returns control back to the consumer (eg
after the process method). And it depends then what happens next. Eg if you
need to do some after work, eg from a mail consumer, you may need to close
a mail session etc.
And if you have restarted the consumer manually in-between, then some side
effects may happen.

It all depends on what you consumer does, and if it can support this or not.



> Thanks
> Joe
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Camel-Component-Consumer-Threading-Question-tp5714233p5714559.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: Camel Component - Consumer Threading Question

Posted by gilboy <jo...@gmail.com>.
Thanks for the response.

I was just thinking about this a little further. If an exception occurs in
my consumer, I really just need to restart the consumer not the whole route. 

If I take this approach, i.e. 

class CustomConsumer extends DefaultConsumer implements Runnable{ 
...... 

public void run(){ 

   try{ 

   } catch (Exception exc){ 
      doStop(); 
      doStart(); 
   } 

..... 
}

By only restarting the consumer I will not need to worry about inflight
exchanges, right?

Is it bad practise to restart a consumer in isolation?

Thanks
Joe

--
View this message in context: http://camel.465427.n5.nabble.com/Camel-Component-Consumer-Threading-Question-tp5714233p5714559.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel Component - Consumer Threading Question

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Read this FAQ how to stop a running route
http://camel.apache.org/how-can-i-stop-a-route-from-a-route.html

eg you need to spin off a new thread to do this, as that would be the
cleanest way.


On Thu, Jun 14, 2012 at 6:47 PM, gilboy <jo...@gmail.com> wrote:

> Thanks again
>
> 1 final question on this. If an exception occurs in the run method of the
> consumer, I want the endpoint, producer and consumer to be re-started.
>
> I was thinking of handling this as follows:
>
> class CustomConsumer extends DefaultConsumer implements Runnable{
> ......
>
> public void run(){
>
>    try{
>
>    } catch (Exception exc){
>       getCamelContext().stopRoute("routeName");
>       getCamelContext().startRoute("routeName");
>    }
>
> .....
> }
>
> However, from the documentation it says I need to call
> getCamelContext().getInflightRepository().remove(Exchange) before calling
> getCamext().stopRoute("routeName"). Unfortunately, in the consumer I do not
> have access to the Exchange object, i.e. I can only create a new Exchange
> object. Any suggestions?
>
> My custom component is a singleton and its referenced in different routes
>
> Thanks
> Joe
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Camel-Component-Consumer-Threading-Question-tp5714233p5714489.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: Camel Component - Consumer Threading Question

Posted by gilboy <jo...@gmail.com>.
Thanks again

1 final question on this. If an exception occurs in the run method of the
consumer, I want the endpoint, producer and consumer to be re-started.

I was thinking of handling this as follows:

class CustomConsumer extends DefaultConsumer implements Runnable{
......

public void run(){

    try{

    } catch (Exception exc){
       getCamelContext().stopRoute("routeName");  
       getCamelContext().startRoute("routeName");  
    }

.....
}

However, from the documentation it says I need to call
getCamelContext().getInflightRepository().remove(Exchange) before calling
getCamext().stopRoute("routeName"). Unfortunately, in the consumer I do not
have access to the Exchange object, i.e. I can only create a new Exchange
object. Any suggestions?

My custom component is a singleton and its referenced in different routes

Thanks
Joe

--
View this message in context: http://camel.465427.n5.nabble.com/Camel-Component-Consumer-Threading-Question-tp5714233p5714489.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel Component - Consumer Threading Question

Posted by Claus Ibsen <cl...@gmail.com>.
On Wed, Jun 13, 2012 at 9:21 PM, gilboy <jo...@gmail.com> wrote:

> Thanks for your input on this thread Claus.
>
> Just to confirm I understand you correctly, the Consumer would look
> something like:
>
> /class CustomConsumer extends DefaultConsumer implements Runnable{
>
>   ExecutorService service;
>
>   CustomConsumer(EndPoint endPoint, Processor processor){
>      super(endPoint, processor);
>   }
>
>   protected void doStart() throws Exception{
>      super.doStart();
>      executor = getCamelContext().getExecutorServiceManager()
>                      .newFixedThreadPool(this, "ConsumerThread", 1)
>      executor.execute(this);
>   }
>
>   public void run(){
>       //Do Processing
>   }
> }/
>
>
Yes and then stop the executor in the doStop method, there is a method on
executor service manager to do that (eg shutdown)


> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Camel-Component-Consumer-Threading-Question-tp5714233p5714452.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: Camel Component - Consumer Threading Question

Posted by gilboy <jo...@gmail.com>.
Thanks for your input on this thread Claus.

Just to confirm I understand you correctly, the Consumer would look
something like:

/class CustomConsumer extends DefaultConsumer implements Runnable{

   ExecutorService service;

   CustomConsumer(EndPoint endPoint, Processor processor){
      super(endPoint, processor);
   }

   protected void doStart() throws Exception{
      super.doStart();
      executor = getCamelContext().getExecutorServiceManager()
                      .newFixedThreadPool(this, "ConsumerThread", 1)
      executor.execute(this);      
   }

   public void run(){
       //Do Processing
   }
}/

--
View this message in context: http://camel.465427.n5.nabble.com/Camel-Component-Consumer-Threading-Question-tp5714233p5714452.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel Component - Consumer Threading Question

Posted by Claus Ibsen <cl...@gmail.com>.
On Tue, Jun 12, 2012 at 6:36 PM, Edwin <ed...@gmail.com> wrote:

> Hi
>
> My consumer monitors the state of a vendor product via an API which the
> vendor provides.
>
> My consumer needs to run in its own thread. The logic in the consumer
> method
> will look something like:
>
> MyConsumer extends DefaultConsumer{
>
>        public void method1(){
>                vendorStatsOK = true;
>                while (vendorStatsOK){
>                        //BLOCKING CALL
>                        Message msg = VendorAPI.waitForMessage();
>                        Exchange exchange = endpoint.createExchange();
>                        exchange.getIn().setBody(msg);
>                        getProcessor().process(exchange);
>                        ....
>                }
>
>        }
> }
>
> Since the call to the vendor API is a blocking call, I am thinking the
> ScheduledPollConsumer is not a good fit. I cannot use the event driven
> consumer either as the vendor product will not invoke my method
>
> Hence, I am trying to figure out the best way to have my consumer run its
> own thread, i.e. have method1 invoked in a separate thread.
>
>
Just create a single threaded executor service, and submit a task to it.
Then it runs in a separate thread.

Camel has APIs to create the executor service, and to shutdown it as well.
This allows you to have the thread pool enlisted in JMX, as well having
thread naming pattern and whatnot.

See details at
http://camel.apache.org/threading-model.html



> Thanks!
> Edwin
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Camel-Component-Consumer-Threading-Question-tp5714233p5714377.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: Camel Component - Consumer Threading Question

Posted by Edwin <ed...@gmail.com>.
Hi

My consumer monitors the state of a vendor product via an API which the
vendor provides.

My consumer needs to run in its own thread. The logic in the consumer method
will look something like:

MyConsumer extends DefaultConsumer{

	public void method1(){
		vendorStatsOK = true;
		while (vendorStatsOK){
                        //BLOCKING CALL
			Message msg = VendorAPI.waitForMessage();
                        Exchange exchange = endpoint.createExchange();
			exchange.getIn().setBody(msg);
			getProcessor().process(exchange);			
			....
		}

	}
}

Since the call to the vendor API is a blocking call, I am thinking the
ScheduledPollConsumer is not a good fit. I cannot use the event driven
consumer either as the vendor product will not invoke my method

Hence, I am trying to figure out the best way to have my consumer run its
own thread, i.e. have method1 invoked in a separate thread.

Thanks!
Edwin

--
View this message in context: http://camel.465427.n5.nabble.com/Camel-Component-Consumer-Threading-Question-tp5714233p5714377.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel Component - Consumer Threading Question

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Its the component that have the powers to what threading model it uses.
So you can use a thread pool for your consumers, or whatever you want.

The ScheduledPollConsumer is a schedule based consumer, eg it runs every X
time.
Such as the file consumer, which wakes up, and scans for new files.

I am not sure if that model fits a socket event based model.
What socket framework do you use? Usually they already have a threading
model
for consumers you can tap into, with a worker queue, and worker threads etc.

for example Apache Mina, or Netty is good socket frameworks to look into,
using in your component.

On Fri, Jun 8, 2012 at 8:48 PM, Edwin <ed...@gmail.com> wrote:

> Hi Folks,
>
> I am currently developing a Camel component comprising of both consumer and
> producer endpoints. The consumer will ping a Socket connection. When a
> connection has been established, a session is then created
>
> In the system there will be 100+ sessions. I plan to build the Camel route
> dynamically so there will be a route consuming from each session
>
> So for N sessions, there will be N routes and each session and route runs
> on
> it's own thread
>
> Applying threading on the route does not present any issues however this is
> proving more challenging on the consumer.
>
> I have been looking at the ScheduledPollConsumer and I could extend this
> class and override it's run() and make one or two other changes so it fits
> my use case but it's not the cleanest of solutions
>
> I'm wondering if Camel provides anything out of the box that ensures a
> consumer is created in it's own thread?
>
> Any thoughts on this topic are much appreciated,
>
> Thanks,
> Edwin
>
>
>
> In the component, I would like the consumer to consume from each of these
> individual threads however the consumer should have it's own thread to read
> from an individual session thread. So the mapping between session and
> consumer is: N sessions : N consumers. create a route that For the
> consumer,
> i'm wondering is it possible to create a thread for every consumer. The
> application of this component
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Camel-Component-Consumer-Threading-Question-tp5714227.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen