You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Simon Klempert <li...@klempert.net> on 2012/11/01 19:47:51 UTC

Implementing Producer for async subsystem

Hi,

I'm implementing a Camel component which should integrate a rule based system (RBS, in my case JESS rules engine) into Camel. I need to run one instance of JESS for each endpoint of the component. The JESS instances are created in the Endpoints in seperate threads: One thread per endpoint running thr JESS instance.

A class based on DefaultProducer is processing the Exchanges (InOut). It puts the Exchange.getIn() message marked with an unique ID into the RBS as a new fact. The RBS thread than process the fact and after the result is calculated, it will call back a Java method with the ID and result. The result should now become the out / reply message for the Exchnage. The callback function is not called in order, so I have to map the results / callbacks based on the ID to the corresponging InOut Exchange / process thread.

In more abstract words: In my Producer I call an async subsystem with calculation request tagged with an ID. Than the process thread should wait till a "global" callback is called with the same ID and reply and than replys with this result.

Route (InOut):
from("direct:test).to(jess:test);

Procucer pseudo code:
public void process(Exchange exchange) throws Exception {
  String id = exchange.getIn().getHeader("ID");
  String message = exchange.getIn().getBody(String.class);

  rbs.putRequest(id, message); // Thread save

  // Here I want to wait for the result / callback with matching ID from the rbs thread

  exchange.getOut().setBody(result);
}

RBS callback (seperate thread):
void rbsResult(String ID, String result) {
  // Inform the matching process thread / exchange about the result
}

I've thought about using a TimoutMap putting IDs and a "CallbackClass" which informs the correct process thread of the result. Maybe there is an easier way to achieve this with Camel or you can point me to existing components implementing sth. similar.


Thanks!

Re: Implementing Producer for async subsystem

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

You can use the AsyncProcessor for this.
http://camel.apache.org/asynchronous-processing.html

Then when the other system comes back with a reply, you can prepare
this in the async callback, and then signal to come to continue.

Camel in Action book chapter 10 have also more details and samples.


On Thu, Nov 1, 2012 at 7:47 PM, Simon Klempert <li...@klempert.net> wrote:
> Hi,
>
> I'm implementing a Camel component which should integrate a rule based system (RBS, in my case JESS rules engine) into Camel. I need to run one instance of JESS for each endpoint of the component. The JESS instances are created in the Endpoints in seperate threads: One thread per endpoint running thr JESS instance.
>
> A class based on DefaultProducer is processing the Exchanges (InOut). It puts the Exchange.getIn() message marked with an unique ID into the RBS as a new fact. The RBS thread than process the fact and after the result is calculated, it will call back a Java method with the ID and result. The result should now become the out / reply message for the Exchnage. The callback function is not called in order, so I have to map the results / callbacks based on the ID to the corresponging InOut Exchange / process thread.
>
> In more abstract words: In my Producer I call an async subsystem with calculation request tagged with an ID. Than the process thread should wait till a "global" callback is called with the same ID and reply and than replys with this result.
>
> Route (InOut):
> from("direct:test).to(jess:test);
>
> Procucer pseudo code:
> public void process(Exchange exchange) throws Exception {
>   String id = exchange.getIn().getHeader("ID");
>   String message = exchange.getIn().getBody(String.class);
>
>   rbs.putRequest(id, message); // Thread save
>
>   // Here I want to wait for the result / callback with matching ID from the rbs thread
>
>   exchange.getOut().setBody(result);
> }
>
> RBS callback (seperate thread):
> void rbsResult(String ID, String result) {
>   // Inform the matching process thread / exchange about the result
> }
>
> I've thought about using a TimoutMap putting IDs and a "CallbackClass" which informs the correct process thread of the result. Maybe there is an easier way to achieve this with Camel or you can point me to existing components implementing sth. similar.
>
>
> Thanks!



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
FuseSource is now part of Red Hat
Email: cibsen@redhat.com
Web: http://fusesource.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: Implementing Producer for async subsystem

Posted by Willem jiang <wi...@gmail.com>.
Current we don't provide generic ReplyManager based on the exchange related id in Camel, I think you can take a look at the camel-jms component, JmsProducer[1] is using the ReplyManager to handle the response which can be related by the exchange id. 

[1]https://svn.apache.org/repos/asf/camel/trunk/camel/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java 

-- 
Willem Jiang

Red Hat, Inc.
FuseSource is now part of Red Hat
Web: http://www.fusesource.com | http://www.redhat.com
Blog: http://willemjiang.blogspot.com (http://willemjiang.blogspot.com/) (English)
          http://jnn.javaeye.com (http://jnn.javaeye.com/) (Chinese)
Twitter: willemjiang 
Weibo: willemjiang





On Friday, November 2, 2012 at 2:47 AM, Simon Klempert wrote:

> Hi,
> 
> I'm implementing a Camel component which should integrate a rule based system (RBS, in my case JESS rules engine) into Camel. I need to run one instance of JESS for each endpoint of the component. The JESS instances are created in the Endpoints in seperate threads: One thread per endpoint running thr JESS instance.
> 
> A class based on DefaultProducer is processing the Exchanges (InOut). It puts the Exchange.getIn() message marked with an unique ID into the RBS as a new fact. The RBS thread than process the fact and after the result is calculated, it will call back a Java method with the ID and result. The result should now become the out / reply message for the Exchnage. The callback function is not called in order, so I have to map the results / callbacks based on the ID to the corresponging InOut Exchange / process thread.
> 
> In more abstract words: In my Producer I call an async subsystem with calculation request tagged with an ID. Than the process thread should wait till a "global" callback is called with the same ID and reply and than replys with this result.
> 
> Route (InOut):
> from("direct:test).to(jess:test);
> 
> Procucer pseudo code:
> public void process(Exchange exchange) throws Exception {
> String id = exchange.getIn().getHeader("ID");
> String message = exchange.getIn().getBody(String.class);
> 
> rbs.putRequest(id, message); // Thread save
> 
> // Here I want to wait for the result / callback with matching ID from the rbs thread
> 
> exchange.getOut().setBody(result);
> }
> 
> RBS callback (seperate thread):
> void rbsResult(String ID, String result) {
> // Inform the matching process thread / exchange about the result
> }
> 
> I've thought about using a TimoutMap putting IDs and a "CallbackClass" which informs the correct process thread of the result. Maybe there is an easier way to achieve this with Camel or you can point me to existing components implementing sth. similar.
> 
> 
> Thanks!