You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by yashgt <ya...@gmail.com> on 2013/12/31 17:56:28 UTC

Asynchronous processing of routes

Hi,

Route1:
	Send Message M1 to MQ Q1. // This message goes to a program that consumes
from Q1 and once some processing is done, write message M2 to Q2.
	Upon receiving Message M2 on Q2, //Q2 receives several messages (M2,N2,P2,
etc.). ONLY when M2 is received, the Route1 should continue.
	Send message M3 to Q3. // The step should be executed only 

Step 2 implies that the message coming on Q2 needs to be inspected and only
if it turns out to be M2, the Route1 should resume. Is there something we
can do to pause a Route while it waits for a message and resume it once the
message arrives?
Can we raise an event so that any Route that is waiting for M2 can be
resumed? We know for sure that among all the parallely executing routes,
only one may wait for M2. What do we need to do to resume the route?

Thanks,
Yash



--
View this message in context: http://camel.465427.n5.nabble.com/Asynchronous-processing-of-routes-tp5745385.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Asynchronous processing of routes

Posted by Henryk Konsek <he...@gmail.com>.
Hi Yash,

Solution to your problem is probably easier than you think. Using
aggregator [1] EIP should solve your issue.

from("jms:Q1").bean(SomeProcessorBean.class).to("jms:Q2");
from("Q2").
  aggregate().constant(true).completionPredicate(body().isEqualTo("M2")).groupExchanges().
  to("Q3");

Aggregation demonstrated below will collect all messages until it
receive M2 message. After M2 message is received, list of all messages
collected so far (including closing M3 message) will be send as single
exchange to Q3.

You probable would like to aggregate groups of messages according to
some correlation identifier [2]. I use constant(true) in the example
to collect all the messages to a single group, but normally you
correlate on header value or body contents.

Cheers.

[1] http://camel.apache.org/aggregator.html
[2] http://camel.apache.org/correlation-identifier.html

-- 
Henryk Konsek
http://henryk-konsek.blogspot.com

Re: Asynchronous processing of routes

Posted by "kraythe ." <kr...@gmail.com>.
Try splitting up routes more and using ActiveMQ or another broker in
between the routes. So instead of "waiting for the message to arrive"
consume the message with a route and resume programming perhaps enriching
with other messages from other sources. You can also create an aggregation
strategy that will aggregate messages and only complete the aggregation
after M2 arrives. Then its up to you what the output of the aggregator is.
This can be more risky in parallel processing with concurrent consumers
since you cant be sure which of the concurrent consumers will consume the
method.

Thinking about suspending and waiting in a route is a synchrous mindset,
think more asynch using signals and aggregation rather than waiting,
calling and replying.

*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*


On Thu, Jan 2, 2014 at 9:39 AM, Richard Kettelerij <
richardkettelerij@gmail.com> wrote:

> >> Step 2 implies that the message coming on Q2 needs to be inspected and
> only if it turns out to be M2.....
>
> I think the Content-Based Router EIP is a good fit here,
> http://camel.apache.org/content-based-router.html. For example:
>
> from("jms:q2")
>     .choice().when(body().isInstanceOf(M2.class))
>          .to("controlbus:routeId=Route1&action=start")
>     .end()
>
> >> Is there something we can do to pause a Route while it waits for a
> message and resume it once the message arrives?
>
> If a route is paused it cannot do anything, its paused after all. You'll
> need another route or process to resume a route. Pausing/resuming routes
> can be done using the CamelContext (
> http://camel.apache.org/how-can-i-stop-a-route-from-a-route.html) or using
> the ControlBus EIP (http://camel.apache.org/controlbus.html). The latter
> is
> easier to use, as illustrated in the code sample above.
>
> >> Can we raise an event so that any Route that is waiting for M2 can be
> resumed?
> >> What do we need to do to resume the route?
>
> Use the CamelContext API directly or use the ControlBus EIP as I mentioned
>
> Regards,
> Richard
>
>
>
> On Tue, Dec 31, 2013 at 5:56 PM, yashgt <ya...@gmail.com> wrote:
>
> > Hi,
> >
> > Route1:
> >         Send Message M1 to MQ Q1. // This message goes to a program that
> > consumes
> > from Q1 and once some processing is done, write message M2 to Q2.
> >         Upon receiving Message M2 on Q2, //Q2 receives several messages
> > (M2,N2,P2,
> > etc.). ONLY when M2 is received, the Route1 should continue.
> >         Send message M3 to Q3. // The step should be executed only
> >
> > Step 2 implies that the message coming on Q2 needs to be inspected and
> only
> > if it turns out to be M2, the Route1 should resume. Is there something we
> > can do to pause a Route while it waits for a message and resume it once
> the
> > message arrives?
> > Can we raise an event so that any Route that is waiting for M2 can be
> > resumed? We know for sure that among all the parallely executing routes,
> > only one may wait for M2. What do we need to do to resume the route?
> >
> > Thanks,
> > Yash
> >
> >
> >
> > --
> > View this message in context:
> >
> http://camel.465427.n5.nabble.com/Asynchronous-processing-of-routes-tp5745385.html
> > Sent from the Camel - Users mailing list archive at Nabble.com.
> >
>

Re: Asynchronous processing of routes

Posted by Richard Kettelerij <ri...@gmail.com>.
>> Step 2 implies that the message coming on Q2 needs to be inspected and
only if it turns out to be M2.....

I think the Content-Based Router EIP is a good fit here,
http://camel.apache.org/content-based-router.html. For example:

from("jms:q2")
    .choice().when(body().isInstanceOf(M2.class))
         .to("controlbus:routeId=Route1&action=start")
    .end()

>> Is there something we can do to pause a Route while it waits for a
message and resume it once the message arrives?

If a route is paused it cannot do anything, its paused after all. You'll
need another route or process to resume a route. Pausing/resuming routes
can be done using the CamelContext (
http://camel.apache.org/how-can-i-stop-a-route-from-a-route.html) or using
the ControlBus EIP (http://camel.apache.org/controlbus.html). The latter is
easier to use, as illustrated in the code sample above.

>> Can we raise an event so that any Route that is waiting for M2 can be
resumed?
>> What do we need to do to resume the route?

Use the CamelContext API directly or use the ControlBus EIP as I mentioned

Regards,
Richard



On Tue, Dec 31, 2013 at 5:56 PM, yashgt <ya...@gmail.com> wrote:

> Hi,
>
> Route1:
>         Send Message M1 to MQ Q1. // This message goes to a program that
> consumes
> from Q1 and once some processing is done, write message M2 to Q2.
>         Upon receiving Message M2 on Q2, //Q2 receives several messages
> (M2,N2,P2,
> etc.). ONLY when M2 is received, the Route1 should continue.
>         Send message M3 to Q3. // The step should be executed only
>
> Step 2 implies that the message coming on Q2 needs to be inspected and only
> if it turns out to be M2, the Route1 should resume. Is there something we
> can do to pause a Route while it waits for a message and resume it once the
> message arrives?
> Can we raise an event so that any Route that is waiting for M2 can be
> resumed? We know for sure that among all the parallely executing routes,
> only one may wait for M2. What do we need to do to resume the route?
>
> Thanks,
> Yash
>
>
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Asynchronous-processing-of-routes-tp5745385.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>

Re: Asynchronous processing of routes

Posted by Henryk Konsek <he...@gmail.com>.
> Correct me if I am wrong. Camel will correlate the response if the other
> service sends back the response with the same correlation ID as was sent to
> it in the Request.
> In our case, the other services are third-party C++ services which work
> directly with MQ and do not use Camel.

Reply correlation Id is not a Camel-specific feature. This is standard
JMS goodie, so if your C++ broker is JMS-compilant it should set the
proper correlation ID.

If for some reason your C++ broker doesn't set JMS correlation ID
properly, you can use exclusive reply queue with JMS producer (just as
demonstrated on the snippet below) - in such case no selectors on the
correlation ID will be used.

from("direct:Q").to("jms:foo?replyTo=Qresponse&replyToType=Exclusive);

I highly recommend reading Camel JMS component documentation page [1]
as it covers RequestReplay communication with Camel in a details.

Cheers.

[1] http://camel.apache.org/jms

-- 
Henryk Konsek
http://henryk-konsek.blogspot.com

Re: Asynchronous processing of routes

Posted by yashgt <ya...@gmail.com>.
Correct me if I am wrong. Camel will correlate the response if the other
service sends back the response with the same correlation ID as was sent to
it in the Request. If the other service is not using Camel and is solely
reading messages directly from MQ, it will not have the knowledge of where
to find the correlation ID in the Request and where to put it in the
Response.

In our case, the other services are third-party C++ services which work
directly with MQ and do not use Camel.



--
View this message in context: http://camel.465427.n5.nabble.com/Asynchronous-processing-of-routes-tp5745385p5745565.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Asynchronous processing of routes

Posted by yashgt <ya...@gmail.com>.
In my case, there may be multiple routes running in parallel. But I do not
want to have multiple instances of the same route.

All I need is a way by which a route will stop midway and wait for a MQ
message to arrive as a response. I know from your responses that this is
possible if the responder fills the right correlation ID. But in my case the
responder does not fill the correlation ID.

Thanks,
Yash



--
View this message in context: http://camel.465427.n5.nabble.com/Asynchronous-processing-of-routes-tp5745385p5745569.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Asynchronous processing of routes

Posted by Claus Ibsen <cl...@gmail.com>.
Just enable asyncConsumer=true

søndag den 5. januar 2014 skrev kraythe . :

> The interesting thing is that you don't have to do synchronous request
> reply to have a route processing flow with a defined order. Consider a
> route like the following:
>
>
> from("jms:queue:input").to("jms:queue:processA").to("jms:queue:processB").to("jms:queue:results")
>
> And the helper routes:
>
> from("jms:queue:processA").to("bean:A")
> from("jms:queue:processB").to("bean:B")
>
> This is a synchronous mindset and suffers from a number of issues in
> scalability. The route calls each sub process through JMS and then waits
> for the response. However, one message that takes a long time to process
> holds up the entire route for all inputs. Furthermore, if process A takes a
> long time but process B is quick, your ability to federate the route is
> bottlenecked. Contrast that with the following.
>
> from("jms:queue:input").to("jms:queue:processA")
> from("jms:queue:processA").to("bean:A").to("jms:queue:processB")
> from("jms:queue:processB").to("bean:B").to("jms:queue:results")
>
> This is much more scalable. You can run 20 routes of type processA and only
> 2 of process B. That allows messages to flow through asynchronously. This
> is a powerful technique but requires you to divorce your mind from
> input-output synchronous techniques. However it can be even better through
> the use of an asynchronous routing slip. The initial route sets the flow
> and stores it as a list of endpoints in a header. As the message flows
> through, each route pops the next item off the stack that makes up the slip
> and sends the message off to that slip.
>
> Through these techniques the only routes that need be synchronous are those
> where users are waiting for responses (such as RESTlet) and even those
> routes can call synchronous routes.
>
> Camel should open your mind to new posibilites.
>
> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>
>
> On Sun, Jan 5, 2014 at 2:27 AM, Claus Ibsen <claus.ibsen@gmail.com<javascript:;>>
> wrote:
>
> > Hi
> >
> > If you do request/reply over JMS in a Camel route then it correlates
> > and waits for the expected reply message.
> >
> > So you can do
> >
> > from X
> >   to jms foo ? replyTo = bar
> >   to Z
> >
> > Where the jms endpoint is doing request/reply over JMS
> >
> > And you can have multiple routes doing request/reply over JMS as well,
> > and also use shared/exclusive queues etc
> > Read more about this on the JMS page at: http://camel.apache.org/jms
> >
> > from X2
> >   to jms foo ? replyTo = bar
> >   to Z2
> >
> > from X3
> >   to jms foo ? replyTo = bar
> >   to Z3
> >
> >
> >
> >
> > On Sat, Jan 4, 2014 at 9:38 PM, yashgt <yashgt@gmail.com <javascript:;>>
> wrote:
> > > Perhaps my post was not clear enough...
> > >
> > > Route 1 send M1 to the target service over MQ1 and waits for the target
> > to
> > > respond back with message M2 on MQ2.
> > >
> > > Now, like Route1, there are many other routes that expect different
> > > messages.
> > >
> > > RouteX expects MX to come on MQ2. RouteY expects MY to come on MQ2.
> > >
> > > The solutions so far indicate that I should keep consuming from MQ2
> till
> > I
> > > get M2 and then proceed with Route1. This means the RouteX and RouteY
> > will
> > > never get the messages as they have been consumed by Route1.
> > >
> > >
> > >
> > >
> > >
> > > --
> > > View this message in context:
> >
> http://camel.465427.n5.nabble.com/Asynchronous-processing-of-routes-tp5745385p5745542.html
> > > Sent from the Camel - Users mailing list archive at Nabble.com.
> >
> >
> >
> > --
> > Claus Ibsen
> > -----------------
> > Red Hat, Inc.
> > Email: cibsen@redhat.com <javascript:;>
> > Twitter: davsclaus
> > Blog: http://davsclaus.com
> > Author of Camel in Action: http://www.manning.com/ibsen
> > Make your Camel applications look hawt, try: http://hawt.io
> >
>


-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
Make your Camel applications look hawt, try: http://hawt.io

Re: Asynchronous processing of routes

Posted by "kraythe ." <kr...@gmail.com>.
The interesting thing is that you don't have to do synchronous request
reply to have a route processing flow with a defined order. Consider a
route like the following:

from("jms:queue:input").to("jms:queue:processA").to("jms:queue:processB").to("jms:queue:results")

And the helper routes:

from("jms:queue:processA").to("bean:A")
from("jms:queue:processB").to("bean:B")

This is a synchronous mindset and suffers from a number of issues in
scalability. The route calls each sub process through JMS and then waits
for the response. However, one message that takes a long time to process
holds up the entire route for all inputs. Furthermore, if process A takes a
long time but process B is quick, your ability to federate the route is
bottlenecked. Contrast that with the following.

from("jms:queue:input").to("jms:queue:processA")
from("jms:queue:processA").to("bean:A").to("jms:queue:processB")
from("jms:queue:processB").to("bean:B").to("jms:queue:results")

This is much more scalable. You can run 20 routes of type processA and only
2 of process B. That allows messages to flow through asynchronously. This
is a powerful technique but requires you to divorce your mind from
input-output synchronous techniques. However it can be even better through
the use of an asynchronous routing slip. The initial route sets the flow
and stores it as a list of endpoints in a header. As the message flows
through, each route pops the next item off the stack that makes up the slip
and sends the message off to that slip.

Through these techniques the only routes that need be synchronous are those
where users are waiting for responses (such as RESTlet) and even those
routes can call synchronous routes.

Camel should open your mind to new posibilites.

*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*


On Sun, Jan 5, 2014 at 2:27 AM, Claus Ibsen <cl...@gmail.com> wrote:

> Hi
>
> If you do request/reply over JMS in a Camel route then it correlates
> and waits for the expected reply message.
>
> So you can do
>
> from X
>   to jms foo ? replyTo = bar
>   to Z
>
> Where the jms endpoint is doing request/reply over JMS
>
> And you can have multiple routes doing request/reply over JMS as well,
> and also use shared/exclusive queues etc
> Read more about this on the JMS page at: http://camel.apache.org/jms
>
> from X2
>   to jms foo ? replyTo = bar
>   to Z2
>
> from X3
>   to jms foo ? replyTo = bar
>   to Z3
>
>
>
>
> On Sat, Jan 4, 2014 at 9:38 PM, yashgt <ya...@gmail.com> wrote:
> > Perhaps my post was not clear enough...
> >
> > Route 1 send M1 to the target service over MQ1 and waits for the target
> to
> > respond back with message M2 on MQ2.
> >
> > Now, like Route1, there are many other routes that expect different
> > messages.
> >
> > RouteX expects MX to come on MQ2. RouteY expects MY to come on MQ2.
> >
> > The solutions so far indicate that I should keep consuming from MQ2 till
> I
> > get M2 and then proceed with Route1. This means the RouteX and RouteY
> will
> > never get the messages as they have been consumed by Route1.
> >
> >
> >
> >
> >
> > --
> > View this message in context:
> http://camel.465427.n5.nabble.com/Asynchronous-processing-of-routes-tp5745385p5745542.html
> > Sent from the Camel - Users mailing list archive at Nabble.com.
>
>
>
> --
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> Email: cibsen@redhat.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
> Make your Camel applications look hawt, try: http://hawt.io
>

Re: Asynchronous processing of routes

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

If you do request/reply over JMS in a Camel route then it correlates
and waits for the expected reply message.

So you can do

from X
  to jms foo ? replyTo = bar
  to Z

Where the jms endpoint is doing request/reply over JMS

And you can have multiple routes doing request/reply over JMS as well,
and also use shared/exclusive queues etc
Read more about this on the JMS page at: http://camel.apache.org/jms

from X2
  to jms foo ? replyTo = bar
  to Z2

from X3
  to jms foo ? replyTo = bar
  to Z3




On Sat, Jan 4, 2014 at 9:38 PM, yashgt <ya...@gmail.com> wrote:
> Perhaps my post was not clear enough...
>
> Route 1 send M1 to the target service over MQ1 and waits for the target to
> respond back with message M2 on MQ2.
>
> Now, like Route1, there are many other routes that expect different
> messages.
>
> RouteX expects MX to come on MQ2. RouteY expects MY to come on MQ2.
>
> The solutions so far indicate that I should keep consuming from MQ2 till I
> get M2 and then proceed with Route1. This means the RouteX and RouteY will
> never get the messages as they have been consumed by Route1.
>
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Asynchronous-processing-of-routes-tp5745385p5745542.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
Make your Camel applications look hawt, try: http://hawt.io

Re: Asynchronous processing of routes

Posted by yashgt <ya...@gmail.com>.
Perhaps my post was not clear enough...

Route 1 send M1 to the target service over MQ1 and waits for the target to
respond back with message M2 on MQ2.

Now, like Route1, there are many other routes that expect different
messages.

RouteX expects MX to come on MQ2. RouteY expects MY to come on MQ2.

The solutions so far indicate that I should keep consuming from MQ2 till I
get M2 and then proceed with Route1. This means the RouteX and RouteY will
never get the messages as they have been consumed by Route1.





--
View this message in context: http://camel.465427.n5.nabble.com/Asynchronous-processing-of-routes-tp5745385p5745542.html
Sent from the Camel - Users mailing list archive at Nabble.com.