You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by James Carman <ja...@carmanconsulting.com> on 2011/08/06 14:37:29 UTC

Socket-based Asynchronous Calls...

We have a server that supports a socket-based protocol.  However, it's
not a synchronous situation.  I send a message over the output stream
of the socket and a it goes over to the server to processed.  The
reply message will be received at some later time on the socket's
input stream, not necessarily in the same order they were sent.  Now,
I'd like to turn this into a request/reply exchange using Camel.  I
plan on using Netty to implement the socket protocol stuff.  However,
I'm not exactly sure how to go about making the requesting threads
wait while my server does its processing.  It appears that Camel
already has stuff built-in to handle this.  I'm just having trouble
setting up the route properly.

Re: Socket-based Asynchronous Calls...

Posted by Taariq Levack <ta...@gmail.com>.
Indeed it is.

On 16 Aug 2011, at 10:12 PM, James Carman <ja...@carmanconsulting.com> wrote:

> Is your socket endpoint set up to be async?
> 
> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <ta...@gmail.com> wrote:
>> Sure
>> 
>> You can of course solve what I've described many ways, but I'll
>> explain using 3 routes as that's what I used.
>> 
>> This first route is the main route I mentioned earlier, so you send
>> your socket messages here and it's multicast to both the aggregator
>> and to the socket.
>> 
>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>  "someOutboundSocketEndpoint");
>> 
>> 
>> This next route will aggregate, both requests and responses are sent
>> here as you envisaged.
>> from("direct:requestResponseAggregator").
>>                .aggregate(header("someCorrellationId"),
>> requestResponseAggregator)
>>                .completionSize(2)
>>                .completionTimeout(5000)
>>                .to("direct:requestResponse"); //Here you can send the
>> "aggregated" message, in my case it's only the response I forward
>> unless there's a timeout, then I forward the request of course.
>> 
>> Finally the route that consumes the socket responses.
>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>   //this headerEnricher doesn't have to be a processor, you have many
>> options to add a header.
>> 
>> If that's not clear feel free to ask.
>> 
>> Taariq
>> 
>> 
>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>> <ja...@carmanconsulting.com> wrote:
>>> Care to share an example?  I'm not picturing it.
>>> 
>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>> Hi James
>>>> 
>>>> I did that too for what it's worth.
>>>> I send the message to a route that forwards to both the aggregator and to the socket.
>>>> When the response comes in I use an enricher to add the ID to the headers and then forward to the aggregator.
>>>> 
>>>> Taariq
>>>> 
>>>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>> 
>>>>> Willem,
>>>>> 
>>>>> Thank you for your help.  I don't think this is doing exactly what I
>>>>> need, though.  The real trick here is the asynchronous nature of the
>>>>> "server" on the other end of this situation.  I thought about using an
>>>>> aggregator to make sure the response gets matched up with the request
>>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>>> responses together into one, it would just make sure it matches the
>>>>> correct response with its request.  Does this sound like a valid
>>>>> approach?  If so, how the heck do I go about it? :)
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> James
>>>>> 
>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <wi...@gmail.com> wrote:
>>>>>> Hi James,
>>>>>> 
>>>>>> Camel async process engine already provides the way that you want.
>>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>> 
>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>>> 
>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>> 
>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hz...@gmail.com>
>>>>>>>  wrote:
>>>>>>>> 
>>>>>>>> Hi James,
>>>>>>>> 
>>>>>>>> I hope I understand your scenario correctly. Here are a few thoughts. I
>>>>>>>> assume want to use camel-netty [1] to send messages to your sever (if you
>>>>>>>> have your own code that does that, you can use it too, but you'd have to
>>>>>>>> write your own Processor or Component). Iiuic, your scenario is converting a
>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your exchange as
>>>>>>>> an async in-out and let your framework (Camel) decompose it and compose it
>>>>>>>> back again. I would not keep threads blocked so I believe your best bet is
>>>>>>>> using the Camel async messaging [2] and Futures (look at the examples using
>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so
>>>>>>>> you'll need a correlationId, which you must have already and something to
>>>>>>>> keep your state. A good bet would be jms [3], or you could write your own.
>>>>>>>> If you used jms you would need to use both a correlationId and a replyTo
>>>>>>>> queue.
>>>>>>>> 
>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>> 
>>>>>>> 
>>>>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>>> course).  So, I would need to either block the request thread, waiting
>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>>> processing stuff (I'm thinking this might help us get more done with
>>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>> 
>>>>>>> We already have a correlation id.  The "protocol" requires one and the
>>>>>>> server process just echos it back in the response message.
>>>>>>> 
>>>>>>>> You may have to play a bit with the correlationId and if you cannot use
>>>>>>>> the same you can do a second transformation/correlation using a claim-check
>>>>>>>> sort of pattern. If you don't want to use jms you can implement your own (in
>>>>>>>> memory) persistence and correlation. You can also use a resequencer [4] if
>>>>>>>> you want to enforce the order. If you use asyncCallback, you get the replies
>>>>>>>> when they become available, and you can control that.
>>>>>>>> 
>>>>>>> 
>>>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>>>>> message comes in after another, but it can be processed faster, so be
>>>>>>> it.
>>>>>>> 
>>>>>>>> It's an interesting scenario, I'll definitely give it more thought, but I
>>>>>>>> hope this helps.
>>>>>>>> Hadrian
>>>>>>>> 
>>>>>>> 
>>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Willem
>>>>>> ----------------------------------
>>>>>> FuseSource
>>>>>> Web: http://www.fusesource.com
>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>>>         http://jnn.javaeye.com (Chinese)
>>>>>> Twitter: willemjiang
>>>>>> Weibo: willemjiang
>>>>>> 
>>>> 
>>> 
>> 

Re: Socket-based Asynchronous Calls...

Posted by James Carman <ja...@carmanconsulting.com>.
Is your socket endpoint set up to be async?

On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <ta...@gmail.com> wrote:
> Sure
>
> You can of course solve what I've described many ways, but I'll
> explain using 3 routes as that's what I used.
>
> This first route is the main route I mentioned earlier, so you send
> your socket messages here and it's multicast to both the aggregator
> and to the socket.
>
> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>  "someOutboundSocketEndpoint");
>
>
> This next route will aggregate, both requests and responses are sent
> here as you envisaged.
> from("direct:requestResponseAggregator").
>                .aggregate(header("someCorrellationId"),
> requestResponseAggregator)
>                .completionSize(2)
>                .completionTimeout(5000)
>                .to("direct:requestResponse"); //Here you can send the
> "aggregated" message, in my case it's only the response I forward
> unless there's a timeout, then I forward the request of course.
>
> Finally the route that consumes the socket responses.
> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>   //this headerEnricher doesn't have to be a processor, you have many
> options to add a header.
>
> If that's not clear feel free to ask.
>
> Taariq
>
>
> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
> <ja...@carmanconsulting.com> wrote:
>> Care to share an example?  I'm not picturing it.
>>
>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <ta...@gmail.com> wrote:
>>> Hi James
>>>
>>> I did that too for what it's worth.
>>> I send the message to a route that forwards to both the aggregator and to the socket.
>>> When the response comes in I use an enricher to add the ID to the headers and then forward to the aggregator.
>>>
>>> Taariq
>>>
>>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>
>>>> Willem,
>>>>
>>>> Thank you for your help.  I don't think this is doing exactly what I
>>>> need, though.  The real trick here is the asynchronous nature of the
>>>> "server" on the other end of this situation.  I thought about using an
>>>> aggregator to make sure the response gets matched up with the request
>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>> responses together into one, it would just make sure it matches the
>>>> correct response with its request.  Does this sound like a valid
>>>> approach?  If so, how the heck do I go about it? :)
>>>>
>>>> Thanks,
>>>>
>>>> James
>>>>
>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <wi...@gmail.com> wrote:
>>>>> Hi James,
>>>>>
>>>>> Camel async process engine already provides the way that you want.
>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>
>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>>
>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>
>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hz...@gmail.com>
>>>>>>  wrote:
>>>>>>>
>>>>>>> Hi James,
>>>>>>>
>>>>>>> I hope I understand your scenario correctly. Here are a few thoughts. I
>>>>>>> assume want to use camel-netty [1] to send messages to your sever (if you
>>>>>>> have your own code that does that, you can use it too, but you'd have to
>>>>>>> write your own Processor or Component). Iiuic, your scenario is converting a
>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your exchange as
>>>>>>> an async in-out and let your framework (Camel) decompose it and compose it
>>>>>>> back again. I would not keep threads blocked so I believe your best bet is
>>>>>>> using the Camel async messaging [2] and Futures (look at the examples using
>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so
>>>>>>> you'll need a correlationId, which you must have already and something to
>>>>>>> keep your state. A good bet would be jms [3], or you could write your own.
>>>>>>> If you used jms you would need to use both a correlationId and a replyTo
>>>>>>> queue.
>>>>>>>
>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>
>>>>>>
>>>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>> course).  So, I would need to either block the request thread, waiting
>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>> processing stuff (I'm thinking this might help us get more done with
>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>
>>>>>> We already have a correlation id.  The "protocol" requires one and the
>>>>>> server process just echos it back in the response message.
>>>>>>
>>>>>>> You may have to play a bit with the correlationId and if you cannot use
>>>>>>> the same you can do a second transformation/correlation using a claim-check
>>>>>>> sort of pattern. If you don't want to use jms you can implement your own (in
>>>>>>> memory) persistence and correlation. You can also use a resequencer [4] if
>>>>>>> you want to enforce the order. If you use asyncCallback, you get the replies
>>>>>>> when they become available, and you can control that.
>>>>>>>
>>>>>>
>>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>>>> message comes in after another, but it can be processed faster, so be
>>>>>> it.
>>>>>>
>>>>>>> It's an interesting scenario, I'll definitely give it more thought, but I
>>>>>>> hope this helps.
>>>>>>> Hadrian
>>>>>>>
>>>>>>
>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Willem
>>>>> ----------------------------------
>>>>> FuseSource
>>>>> Web: http://www.fusesource.com
>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>>         http://jnn.javaeye.com (Chinese)
>>>>> Twitter: willemjiang
>>>>> Weibo: willemjiang
>>>>>
>>>
>>
>

Re: Socket-based Asynchronous Calls...

Posted by James Carman <jc...@carmanconsulting.com>.
I am a commons committer so don't think it hasn't crossed my mind. :)
On Aug 28, 2011 11:48 PM, "Claus Ibsen" <cl...@gmail.com> wrote:
> On Sat, Aug 27, 2011 at 10:19 AM, James Carman
> <ja...@carmanconsulting.com> wrote:
>> Well, I can tell you that it certainly didn't seem to work the way I
>> need it to work.  I need a persistent connection (with automatic
>> reconnects).  I also need it to be in/out, but asynchronous (the
>> current incoming message may or may not correspond to the most
>> recently sent message).  For now, I've resorted to just rolling my own
>> solution by directly coding to the Netty API.  I will re-visit with
>> Camel later I'm sure.  Thanks for your help.
>>
>
> Well you could also consider contribution improvements to the Camel
components.
>
> The Camel community love contributions
> http://camel.apache.org/contributing.html
>
>> On Thu, Aug 25, 2011 at 1:16 AM, Taariq Levack <ta...@gmail.com> wrote:
>>> I expect that the connection will only be closed if the header
>>> NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE is true.
>>>
>>> Glancing at the code I see what you mean, it's quite unlike MINA's
producer
>>> which checks the session to see if it's connected and reuses it, but it
may
>>> be that under the hood, yet further under the hood, hehe, way further
down
>>> into netty's ClientBootstrap and beyond, the connection is being reused.
I
>>> don't know for sure.
>>>
>>> This is from Netty front-page, "True connectionless datagram socket
support
>>> (since 3.1)":
>>> And glancing at that bit elsewhere I think it's possible to do without
this
>>> sort of plumbing, but you'd have to jump into netty code or docs to
confirm.
>>>
>>> Depending on timing and other factors I would go ahead with a POC
because it
>>> either works or it will work, a failing test from your POC will be most
>>> welcome.
>>>
>>> Taariq
>>>
>>>
>>> On Wed, Aug 24, 2011 at 12:43 PM, James Carman
>>> <ja...@carmanconsulting.com>wrote:
>>>
>>>> I have read the source:
>>>>
>>>>
>>>>
http://svn.apache.org/repos/asf/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
>>>>
>>>> Take a look at the process() method.  In there, there is a block of
>>>> code that does:
>>>>
>>>>        ChannelFuture channelFuture;
>>>>        final Channel channel;
>>>>        try {
>>>>            channelFuture = openConnection(exchange, callback);
>>>>            channel = openChannel(channelFuture);
>>>>        } catch (Exception e) {
>>>>            exchange.setException(e);
>>>>            callback.done(true);
>>>>            return true;
>>>>        }
>>>>
>>>>
>>>> This is not inside an if block or anything and the openConnection()
>>>> method does actually open it, it isn't just returning a
>>>> previously-opened connection or anything.
>>>>
>>>> Perhaps I'm missing something (entirely possible), but it appears that
>>>> it's opening the connection every time the process() method is called.
>>>>
>>>> On Tue, Aug 23, 2011 at 11:09 PM, Taariq Levack <ta...@gmail.com>
wrote:
>>>> > That doesn't sound right, what have you read? Logs/docs?
>>>> > And are you using keep-alive?
>>>> >
>>>> > Taariq
>>>> >
>>>> >
>>>> > On 24 Aug 2011, at 12:12 AM, James Carman <james@carmanconsulting.com
>
>>>> wrote:
>>>> >
>>>> >> Well, it looks like the camel-netty component won't work for me.  It
>>>> >> appears that it opens the connection for each exchange.  Am I
reading
>>>> >> that right?  What I need is a persistent connection with automatic
>>>> >> reconnects.  Oh well, back to the drawing board.
>>>> >>
>>>> >> On Wed, Aug 17, 2011 at 7:59 AM, James Carman
>>>> >> <ja...@carmanconsulting.com> wrote:
>>>> >>> That's what I've been staring at! :)  Here's what I'm thinking I'm
>>>> >>> going to need to write.  I need an async processor that remembers
the
>>>> >>> AsyncCallback and associates it with a correlation id.  Then, when
>>>> >>> another exchange comes in that has the same correlation id, it will
>>>> >>> lookup the previous callback and say that it's done.  I have a lot
of
>>>> >>> questions, though.  I've never had to get so "down and dirty" with
>>>> >>> Camel before.  The components have just worked for me "off the
shelf."
>>>> >>>
>>>> >>> 1.  Do I just copy the input message of the Exchange that comes in
>>>> >>> second to the output message of the originating exchange?
>>>> >>> 2.  How do I do a timeout for the original caller (the CXF
request)?
>>>> >>> 3.  How do I detect that the caller has timed out if they do?
>>>> >>>
>>>> >>> I'm sure I'll have more questions, but these are the ones off the
top
>>>> >>> of my head.
>>>> >>>
>>>> >>> On Wed, Aug 17, 2011 at 1:48 AM, Taariq Levack <ta...@gmail.com>
>>>> wrote:
>>>> >>>> James I think the rest of your puzzle is solved by Camel's async
API,
>>>> >>>> you might have to check if your task is done, maybe your
>>>> >>>> requestResponse populates some collection of responses and
provides
>>>> >>>> some API to return the response given a correlationID.
>>>> >>>> Stare at the async docs [1] a few more times and I'm sure you'll
find
>>>> >>>> your answer.
>>>> >>>>
>>>> >>>> [1] http://camel.apache.org/async.html
>>>> >>>>
>>>> >>>> Taariq
>>>> >>>>
>>>> >>>> On Tue, Aug 16, 2011 at 11:16 PM, James Carman
>>>> >>>> <ja...@carmanconsulting.com> wrote:
>>>> >>>>> No worries!  Thank you for your help.  It helped me understand a
bit
>>>> >>>>> more about how these aggregators work..  However, I still don't
>>>> >>>>> understand how to take care of my problem.  I guess I'm going to
have
>>>> >>>>> to roll my own processor or something.
>>>> >>>>>
>>>> >>>>> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <taariql@gmail.com
>
>>>> wrote:
>>>> >>>>>> Hmmm.
>>>> >>>>>> Maybe others can help with that if it's possible, I haven't had
to
>>>> wrestle with it.
>>>> >>>>>>
>>>> >>>>>> In my case it is actually a cxf service too, but it's
asynchronous
>>>>  and I send the response once I have it, indicating either timeout or
the
>>>> actual response.
>>>> >>>>>>
>>>> >>>>>> Sorry I responded to your question without going back to see
your
>>>> other posts.
>>>> >>>>>>
>>>> >>>>>> Taariq
>>>> >>>>>>
>>>> >>>>>> On 16 Aug 2011, at 10:33 PM, James Carman <
>>>> james@carmanconsulting.com> wrote:
>>>> >>>>>>
>>>> >>>>>>> In my case, the originating request comes from CXF.  How do I
send
>>>> the
>>>> >>>>>>> aggregated response back to CXF?
>>>> >>>>>>>
>>>> >>>>>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <
taariql@gmail.com>
>>>> wrote:
>>>> >>>>>>>> The consumer that handles the aggregated/timed-out request or
>>>> response.
>>>> >>>>>>>>
>>>> >>>>>>>> I have to resend a few times if it's the request, I simply
feed it
>>>> back into "direct:socketRequestRoute" with the header for the number of
>>>> retry attempts incremented.
>>>> >>>>>>>> If it's the response I can forward to some process.
>>>> >>>>>>>>
>>>> >>>>>>>> Taariq
>>>> >>>>>>>>
>>>> >>>>>>>> On 16 Aug 2011, at 10:18 PM, James Carman <
>>>> james@carmanconsulting.com> wrote:
>>>> >>>>>>>>
>>>> >>>>>>>>> What's listening on the:
>>>> >>>>>>>>>
>>>> >>>>>>>>> to("direct:requestResponse")
>>>> >>>>>>>>>
>>>> >>>>>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <
>>>> taariql@gmail.com> wrote:
>>>> >>>>>>>>>> Sure
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> You can of course solve what I've described many ways, but
I'll
>>>> >>>>>>>>>> explain using 3 routes as that's what I used.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> This first route is the main route I mentioned earlier, so
you
>>>> send
>>>> >>>>>>>>>> your socket messages here and it's multicast to both the
>>>> aggregator
>>>> >>>>>>>>>> and to the socket.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>
>>>>
from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>>> >>>>>>>>>>  "someOutboundSocketEndpoint");
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> This next route will aggregate, both requests and responses
are
>>>> sent
>>>> >>>>>>>>>> here as you envisaged.
>>>> >>>>>>>>>> from("direct:requestResponseAggregator").
>>>> >>>>>>>>>>                .aggregate(header("someCorrellationId"),
>>>> >>>>>>>>>> requestResponseAggregator)
>>>> >>>>>>>>>>                .completionSize(2)
>>>> >>>>>>>>>>                .completionTimeout(5000)
>>>> >>>>>>>>>>                .to("direct:requestResponse"); //Here you can
>>>> send the
>>>> >>>>>>>>>> "aggregated" message, in my case it's only the response I
>>>> forward
>>>> >>>>>>>>>> unless there's a timeout, then I forward the request of
course.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Finally the route that consumes the socket responses.
>>>> >>>>>>>>>>
>>>>
from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>>> >>>>>>>>>>   //this headerEnricher doesn't have to be a processor, you
have
>>>> many
>>>> >>>>>>>>>> options to add a header.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> If that's not clear feel free to ask.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Taariq
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>>> >>>>>>>>>> <ja...@carmanconsulting.com> wrote:
>>>> >>>>>>>>>>> Care to share an example?  I'm not picturing it.
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <
>>>> taariql@gmail.com> wrote:
>>>> >>>>>>>>>>>> Hi James
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> I did that too for what it's worth.
>>>> >>>>>>>>>>>> I send the message to a route that forwards to both the
>>>> aggregator and to the socket.
>>>> >>>>>>>>>>>> When the response comes in I use an enricher to add the ID
to
>>>> the headers and then forward to the aggregator.
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> Taariq
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <
>>>> james@carmanconsulting.com> wrote:
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>>> Willem,
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>> Thank you for your help.  I don't think this is doing
exactly
>>>> what I
>>>> >>>>>>>>>>>>> need, though.  The real trick here is the asynchronous
nature
>>>> of the
>>>> >>>>>>>>>>>>> "server" on the other end of this situation.  I thought
about
>>>> using an
>>>> >>>>>>>>>>>>> aggregator to make sure the response gets matched up with
the
>>>> request
>>>> >>>>>>>>>>>>> using a correlation id.  The aggregator wouldn't
aggregate
>>>> multiple
>>>> >>>>>>>>>>>>> responses together into one, it would just make sure it
>>>> matches the
>>>> >>>>>>>>>>>>> correct response with its request.  Does this sound like
a
>>>> valid
>>>> >>>>>>>>>>>>> approach?  If so, how the heck do I go about it? :)
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>> Thanks,
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>> James
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <
>>>> willem.jiang@gmail.com> wrote:
>>>> >>>>>>>>>>>>>> Hi James,
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> Camel async process engine already provides the way that
you
>>>> want.
>>>> >>>>>>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some
>>>> example.
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> [1]
>>>>
http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>> >>>>>>>>>>>>>> [2]
>>>>
http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<
>>>> hzbarcea@gmail.com>
>>>> >>>>>>>>>>>>>>>  wrote:
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>> Hi James,
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>> I hope I understand your scenario correctly. Here are
a
>>>> few thoughts. I
>>>> >>>>>>>>>>>>>>>> assume want to use camel-netty [1] to send messages to
>>>> your sever (if you
>>>> >>>>>>>>>>>>>>>> have your own code that does that, you can use it too,
but
>>>> you'd have to
>>>> >>>>>>>>>>>>>>>> write your own Processor or Component). Iiuic, your
>>>> scenario is converting a
>>>> >>>>>>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then
treat
>>>> your exchange as
>>>> >>>>>>>>>>>>>>>> an async in-out and let your framework (Camel)
decompose
>>>> it and compose it
>>>> >>>>>>>>>>>>>>>> back again. I would not keep threads blocked so I
believe
>>>> your best bet is
>>>> >>>>>>>>>>>>>>>> using the Camel async messaging [2] and Futures (look
at
>>>> the examples using
>>>> >>>>>>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that
Camel is
>>>> stateless so
>>>> >>>>>>>>>>>>>>>> you'll need a correlationId, which you must have
already
>>>> and something to
>>>> >>>>>>>>>>>>>>>> keep your state. A good bet would be jms [3], or you
could
>>>> write your own.
>>>> >>>>>>>>>>>>>>>> If you used jms you would need to use both a
correlationId
>>>> and a replyTo
>>>> >>>>>>>>>>>>>>>> queue.
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>>
>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>> >>>>>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> Perhaps a bit more information might be appropriate
here.
>>>>  Eventually,
>>>> >>>>>>>>>>>>>>> I'd like to "expose" this route via web services (using
CXF
>>>> of
>>>> >>>>>>>>>>>>>>> course).  So, I would need to either block the request
>>>> thread, waiting
>>>> >>>>>>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0
>>>> asynchronous
>>>> >>>>>>>>>>>>>>> processing stuff (I'm thinking this might help us get
more
>>>> done with
>>>> >>>>>>>>>>>>>>> less http request threads) to do more of a continuation
>>>> thing.
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> We already have a correlation id.  The "protocol"
requires
>>>> one and the
>>>> >>>>>>>>>>>>>>> server process just echos it back in the response
message.
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>> You may have to play a bit with the correlationId and
if
>>>> you cannot use
>>>> >>>>>>>>>>>>>>>> the same you can do a second
transformation/correlation
>>>> using a claim-check
>>>> >>>>>>>>>>>>>>>> sort of pattern. If you don't want to use jms you can
>>>> implement your own (in
>>>> >>>>>>>>>>>>>>>> memory) persistence and correlation. You can also use
a
>>>> resequencer [4] if
>>>> >>>>>>>>>>>>>>>> you want to enforce the order. If you use
asyncCallback,
>>>> you get the replies
>>>> >>>>>>>>>>>>>>>> when they become available, and you can control that.
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> I don't think a resequencer is necessary.  I don't want
to
>>>> guarantee
>>>> >>>>>>>>>>>>>>> the ordering.  I'm mostly interested in throughput
here.
>>>>  So, if a
>>>> >>>>>>>>>>>>>>> message comes in after another, but it can be processed
>>>> faster, so be
>>>> >>>>>>>>>>>>>>> it.
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>> It's an interesting scenario, I'll definitely give it
more
>>>> thought, but I
>>>> >>>>>>>>>>>>>>>> hope this helps.
>>>> >>>>>>>>>>>>>>>> Hadrian
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> You have been very helpful.  Thank you for taking the
time!
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> --
>>>> >>>>>>>>>>>>>> Willem
>>>> >>>>>>>>>>>>>> ----------------------------------
>>>> >>>>>>>>>>>>>> FuseSource
>>>> >>>>>>>>>>>>>> Web: http://www.fusesource.com
>>>> >>>>>>>>>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>> >>>>>>>>>>>>>>         http://jnn.javaeye.com (Chinese)
>>>> >>>>>>>>>>>>>> Twitter: willemjiang
>>>> >>>>>>>>>>>>>> Weibo: willemjiang
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>
>>>> >>>>>>
>>>> >>>>>
>>>> >>>>
>>>> >>>
>>>> >
>>>>
>>>
>>
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cibsen@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/

Re: Socket-based Asynchronous Calls...

Posted by Claus Ibsen <cl...@gmail.com>.
On Sat, Aug 27, 2011 at 10:19 AM, James Carman
<ja...@carmanconsulting.com> wrote:
> Well, I can tell you that it certainly didn't seem to work the way I
> need it to work.  I need a persistent connection (with automatic
> reconnects).  I also need it to be in/out, but asynchronous (the
> current incoming message may or may not correspond to the most
> recently sent message).  For now, I've resorted to just rolling my own
> solution by directly coding to the Netty API.  I will re-visit with
> Camel later I'm sure.  Thanks for your help.
>

Well you could also consider contribution improvements to the Camel components.

The Camel community love contributions
http://camel.apache.org/contributing.html

> On Thu, Aug 25, 2011 at 1:16 AM, Taariq Levack <ta...@gmail.com> wrote:
>> I expect that the connection will only be closed if the header
>> NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE is true.
>>
>> Glancing at the code I see what you mean, it's quite unlike MINA's producer
>> which checks the session to see if it's connected and reuses it, but it may
>> be that under the hood, yet further under the hood, hehe, way further down
>> into netty's ClientBootstrap and beyond, the connection is being reused. I
>> don't know for sure.
>>
>> This is from Netty front-page, "True connectionless datagram socket support
>> (since 3.1)":
>> And glancing at that bit elsewhere I think it's possible to do without this
>> sort of plumbing, but you'd have to jump into netty code or docs to confirm.
>>
>> Depending on timing and other factors I would go ahead with a POC because it
>> either works or it will work, a failing test from your POC will be most
>> welcome.
>>
>> Taariq
>>
>>
>> On Wed, Aug 24, 2011 at 12:43 PM, James Carman
>> <ja...@carmanconsulting.com>wrote:
>>
>>> I have read the source:
>>>
>>>
>>> http://svn.apache.org/repos/asf/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
>>>
>>> Take a look at the process() method.  In there, there is a block of
>>> code that does:
>>>
>>>        ChannelFuture channelFuture;
>>>        final Channel channel;
>>>        try {
>>>            channelFuture = openConnection(exchange, callback);
>>>            channel = openChannel(channelFuture);
>>>        } catch (Exception e) {
>>>            exchange.setException(e);
>>>            callback.done(true);
>>>            return true;
>>>        }
>>>
>>>
>>> This is not inside an if block or anything and the openConnection()
>>> method does actually open it, it isn't just returning a
>>> previously-opened connection or anything.
>>>
>>> Perhaps I'm missing something (entirely possible), but it appears that
>>> it's opening the connection every time the process() method is called.
>>>
>>> On Tue, Aug 23, 2011 at 11:09 PM, Taariq Levack <ta...@gmail.com> wrote:
>>> > That doesn't sound right, what have you read? Logs/docs?
>>> > And are you using keep-alive?
>>> >
>>> > Taariq
>>> >
>>> >
>>> > On 24 Aug 2011, at 12:12 AM, James Carman <ja...@carmanconsulting.com>
>>> wrote:
>>> >
>>> >> Well, it looks like the camel-netty component won't work for me.  It
>>> >> appears that it opens the connection for each exchange.  Am I reading
>>> >> that right?  What I need is a persistent connection with automatic
>>> >> reconnects.  Oh well, back to the drawing board.
>>> >>
>>> >> On Wed, Aug 17, 2011 at 7:59 AM, James Carman
>>> >> <ja...@carmanconsulting.com> wrote:
>>> >>> That's what I've been staring at! :)  Here's what I'm thinking I'm
>>> >>> going to need to write.  I need an async processor that remembers the
>>> >>> AsyncCallback and associates it with a correlation id.  Then, when
>>> >>> another exchange comes in that has the same correlation id, it will
>>> >>> lookup the previous callback and say that it's done.  I have a lot of
>>> >>> questions, though.  I've never had to get so "down and dirty" with
>>> >>> Camel before.  The components have just worked for me "off the shelf."
>>> >>>
>>> >>> 1.  Do I just copy the input message of the Exchange that comes in
>>> >>> second to the output message of the originating exchange?
>>> >>> 2.  How do I do a timeout for the original caller (the CXF request)?
>>> >>> 3.  How do I detect that the caller has timed out if they do?
>>> >>>
>>> >>> I'm sure I'll have more questions, but these are the ones off the top
>>> >>> of my head.
>>> >>>
>>> >>> On Wed, Aug 17, 2011 at 1:48 AM, Taariq Levack <ta...@gmail.com>
>>> wrote:
>>> >>>> James I think the rest of your puzzle is solved by Camel's async API,
>>> >>>> you might have to check if your task is done, maybe your
>>> >>>> requestResponse populates some collection of responses and provides
>>> >>>> some API to return the response given a correlationID.
>>> >>>> Stare at the async docs [1] a few more times and I'm sure you'll find
>>> >>>> your answer.
>>> >>>>
>>> >>>> [1] http://camel.apache.org/async.html
>>> >>>>
>>> >>>> Taariq
>>> >>>>
>>> >>>> On Tue, Aug 16, 2011 at 11:16 PM, James Carman
>>> >>>> <ja...@carmanconsulting.com> wrote:
>>> >>>>> No worries!  Thank you for your help.  It helped me understand a bit
>>> >>>>> more about how these aggregators work..  However, I still don't
>>> >>>>> understand how to take care of my problem.  I guess I'm going to have
>>> >>>>> to roll my own processor or something.
>>> >>>>>
>>> >>>>> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <ta...@gmail.com>
>>> wrote:
>>> >>>>>> Hmmm.
>>> >>>>>> Maybe others can help with that if it's possible, I haven't had to
>>> wrestle with it.
>>> >>>>>>
>>> >>>>>> In my case it is actually a cxf service too, but it's asynchronous
>>>  and I send the response once I have it, indicating either timeout or the
>>> actual response.
>>> >>>>>>
>>> >>>>>> Sorry I responded to your question without going back to see your
>>> other posts.
>>> >>>>>>
>>> >>>>>> Taariq
>>> >>>>>>
>>> >>>>>> On 16 Aug 2011, at 10:33 PM, James Carman <
>>> james@carmanconsulting.com> wrote:
>>> >>>>>>
>>> >>>>>>> In my case, the originating request comes from CXF.  How do I send
>>> the
>>> >>>>>>> aggregated response back to CXF?
>>> >>>>>>>
>>> >>>>>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <ta...@gmail.com>
>>> wrote:
>>> >>>>>>>> The consumer that handles the aggregated/timed-out request or
>>> response.
>>> >>>>>>>>
>>> >>>>>>>> I have to resend a few times if it's the request, I simply feed it
>>> back into "direct:socketRequestRoute" with the header for the number of
>>> retry attempts incremented.
>>> >>>>>>>> If it's the response I can forward to some process.
>>> >>>>>>>>
>>> >>>>>>>> Taariq
>>> >>>>>>>>
>>> >>>>>>>> On 16 Aug 2011, at 10:18 PM, James Carman <
>>> james@carmanconsulting.com> wrote:
>>> >>>>>>>>
>>> >>>>>>>>> What's listening on the:
>>> >>>>>>>>>
>>> >>>>>>>>> to("direct:requestResponse")
>>> >>>>>>>>>
>>> >>>>>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <
>>> taariql@gmail.com> wrote:
>>> >>>>>>>>>> Sure
>>> >>>>>>>>>>
>>> >>>>>>>>>> You can of course solve what I've described many ways, but I'll
>>> >>>>>>>>>> explain using 3 routes as that's what I used.
>>> >>>>>>>>>>
>>> >>>>>>>>>> This first route is the main route I mentioned earlier, so you
>>> send
>>> >>>>>>>>>> your socket messages here and it's multicast to both the
>>> aggregator
>>> >>>>>>>>>> and to the socket.
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>> >>>>>>>>>>  "someOutboundSocketEndpoint");
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> This next route will aggregate, both requests and responses are
>>> sent
>>> >>>>>>>>>> here as you envisaged.
>>> >>>>>>>>>> from("direct:requestResponseAggregator").
>>> >>>>>>>>>>                .aggregate(header("someCorrellationId"),
>>> >>>>>>>>>> requestResponseAggregator)
>>> >>>>>>>>>>                .completionSize(2)
>>> >>>>>>>>>>                .completionTimeout(5000)
>>> >>>>>>>>>>                .to("direct:requestResponse"); //Here you can
>>> send the
>>> >>>>>>>>>> "aggregated" message, in my case it's only the response I
>>> forward
>>> >>>>>>>>>> unless there's a timeout, then I forward the request of course.
>>> >>>>>>>>>>
>>> >>>>>>>>>> Finally the route that consumes the socket responses.
>>> >>>>>>>>>>
>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>> >>>>>>>>>>   //this headerEnricher doesn't have to be a processor, you have
>>> many
>>> >>>>>>>>>> options to add a header.
>>> >>>>>>>>>>
>>> >>>>>>>>>> If that's not clear feel free to ask.
>>> >>>>>>>>>>
>>> >>>>>>>>>> Taariq
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>> >>>>>>>>>> <ja...@carmanconsulting.com> wrote:
>>> >>>>>>>>>>> Care to share an example?  I'm not picturing it.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <
>>> taariql@gmail.com> wrote:
>>> >>>>>>>>>>>> Hi James
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> I did that too for what it's worth.
>>> >>>>>>>>>>>> I send the message to a route that forwards to both the
>>> aggregator and to the socket.
>>> >>>>>>>>>>>> When the response comes in I use an enricher to add the ID to
>>> the headers and then forward to the aggregator.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> Taariq
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <
>>> james@carmanconsulting.com> wrote:
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>> Willem,
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> Thank you for your help.  I don't think this is doing exactly
>>> what I
>>> >>>>>>>>>>>>> need, though.  The real trick here is the asynchronous nature
>>> of the
>>> >>>>>>>>>>>>> "server" on the other end of this situation.  I thought about
>>> using an
>>> >>>>>>>>>>>>> aggregator to make sure the response gets matched up with the
>>> request
>>> >>>>>>>>>>>>> using a correlation id.  The aggregator wouldn't aggregate
>>> multiple
>>> >>>>>>>>>>>>> responses together into one, it would just make sure it
>>> matches the
>>> >>>>>>>>>>>>> correct response with its request.  Does this sound like a
>>> valid
>>> >>>>>>>>>>>>> approach?  If so, how the heck do I go about it? :)
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> Thanks,
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> James
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <
>>> willem.jiang@gmail.com> wrote:
>>> >>>>>>>>>>>>>> Hi James,
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> Camel async process engine already provides the way that you
>>> want.
>>> >>>>>>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some
>>> example.
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> [1]
>>> http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>> >>>>>>>>>>>>>> [2]
>>> http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<
>>> hzbarcea@gmail.com>
>>> >>>>>>>>>>>>>>>  wrote:
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> Hi James,
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> I hope I understand your scenario correctly. Here are a
>>> few thoughts. I
>>> >>>>>>>>>>>>>>>> assume want to use camel-netty [1] to send messages to
>>> your sever (if you
>>> >>>>>>>>>>>>>>>> have your own code that does that, you can use it too, but
>>> you'd have to
>>> >>>>>>>>>>>>>>>> write your own Processor or Component). Iiuic, your
>>> scenario is converting a
>>> >>>>>>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat
>>> your exchange as
>>> >>>>>>>>>>>>>>>> an async in-out and let your framework (Camel) decompose
>>> it and compose it
>>> >>>>>>>>>>>>>>>> back again. I would not keep threads blocked so I believe
>>> your best bet is
>>> >>>>>>>>>>>>>>>> using the Camel async messaging [2] and Futures (look at
>>> the examples using
>>> >>>>>>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is
>>> stateless so
>>> >>>>>>>>>>>>>>>> you'll need a correlationId, which you must have already
>>> and something to
>>> >>>>>>>>>>>>>>>> keep your state. A good bet would be jms [3], or you could
>>> write your own.
>>> >>>>>>>>>>>>>>>> If you used jms you would need to use both a correlationId
>>> and a replyTo
>>> >>>>>>>>>>>>>>>> queue.
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>
>>> from("jms:request-queue").to("netty:output?=correlationId");
>>> >>>>>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> Perhaps a bit more information might be appropriate here.
>>>  Eventually,
>>> >>>>>>>>>>>>>>> I'd like to "expose" this route via web services (using CXF
>>> of
>>> >>>>>>>>>>>>>>> course).  So, I would need to either block the request
>>> thread, waiting
>>> >>>>>>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0
>>> asynchronous
>>> >>>>>>>>>>>>>>> processing stuff (I'm thinking this might help us get more
>>> done with
>>> >>>>>>>>>>>>>>> less http request threads) to do more of a continuation
>>> thing.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> We already have a correlation id.  The "protocol" requires
>>> one and the
>>> >>>>>>>>>>>>>>> server process just echos it back in the response message.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> You may have to play a bit with the correlationId and if
>>> you cannot use
>>> >>>>>>>>>>>>>>>> the same you can do a second transformation/correlation
>>> using a claim-check
>>> >>>>>>>>>>>>>>>> sort of pattern. If you don't want to use jms you can
>>> implement your own (in
>>> >>>>>>>>>>>>>>>> memory) persistence and correlation. You can also use a
>>> resequencer [4] if
>>> >>>>>>>>>>>>>>>> you want to enforce the order. If you use asyncCallback,
>>> you get the replies
>>> >>>>>>>>>>>>>>>> when they become available, and you can control that.
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> I don't think a resequencer is necessary.  I don't want to
>>> guarantee
>>> >>>>>>>>>>>>>>> the ordering.  I'm mostly interested in throughput here.
>>>  So, if a
>>> >>>>>>>>>>>>>>> message comes in after another, but it can be processed
>>> faster, so be
>>> >>>>>>>>>>>>>>> it.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> It's an interesting scenario, I'll definitely give it more
>>> thought, but I
>>> >>>>>>>>>>>>>>>> hope this helps.
>>> >>>>>>>>>>>>>>>> Hadrian
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> You have been very helpful.  Thank you for taking the time!
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> --
>>> >>>>>>>>>>>>>> Willem
>>> >>>>>>>>>>>>>> ----------------------------------
>>> >>>>>>>>>>>>>> FuseSource
>>> >>>>>>>>>>>>>> Web: http://www.fusesource.com
>>> >>>>>>>>>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>> >>>>>>>>>>>>>>         http://jnn.javaeye.com (Chinese)
>>> >>>>>>>>>>>>>> Twitter: willemjiang
>>> >>>>>>>>>>>>>> Weibo: willemjiang
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>
>>> >>>>>>
>>> >>>>>
>>> >>>>
>>> >>>
>>> >
>>>
>>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: Socket-based Asynchronous Calls...

Posted by James Carman <ja...@carmanconsulting.com>.
Well, I can tell you that it certainly didn't seem to work the way I
need it to work.  I need a persistent connection (with automatic
reconnects).  I also need it to be in/out, but asynchronous (the
current incoming message may or may not correspond to the most
recently sent message).  For now, I've resorted to just rolling my own
solution by directly coding to the Netty API.  I will re-visit with
Camel later I'm sure.  Thanks for your help.

On Thu, Aug 25, 2011 at 1:16 AM, Taariq Levack <ta...@gmail.com> wrote:
> I expect that the connection will only be closed if the header
> NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE is true.
>
> Glancing at the code I see what you mean, it's quite unlike MINA's producer
> which checks the session to see if it's connected and reuses it, but it may
> be that under the hood, yet further under the hood, hehe, way further down
> into netty's ClientBootstrap and beyond, the connection is being reused. I
> don't know for sure.
>
> This is from Netty front-page, "True connectionless datagram socket support
> (since 3.1)":
> And glancing at that bit elsewhere I think it's possible to do without this
> sort of plumbing, but you'd have to jump into netty code or docs to confirm.
>
> Depending on timing and other factors I would go ahead with a POC because it
> either works or it will work, a failing test from your POC will be most
> welcome.
>
> Taariq
>
>
> On Wed, Aug 24, 2011 at 12:43 PM, James Carman
> <ja...@carmanconsulting.com>wrote:
>
>> I have read the source:
>>
>>
>> http://svn.apache.org/repos/asf/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
>>
>> Take a look at the process() method.  In there, there is a block of
>> code that does:
>>
>>        ChannelFuture channelFuture;
>>        final Channel channel;
>>        try {
>>            channelFuture = openConnection(exchange, callback);
>>            channel = openChannel(channelFuture);
>>        } catch (Exception e) {
>>            exchange.setException(e);
>>            callback.done(true);
>>            return true;
>>        }
>>
>>
>> This is not inside an if block or anything and the openConnection()
>> method does actually open it, it isn't just returning a
>> previously-opened connection or anything.
>>
>> Perhaps I'm missing something (entirely possible), but it appears that
>> it's opening the connection every time the process() method is called.
>>
>> On Tue, Aug 23, 2011 at 11:09 PM, Taariq Levack <ta...@gmail.com> wrote:
>> > That doesn't sound right, what have you read? Logs/docs?
>> > And are you using keep-alive?
>> >
>> > Taariq
>> >
>> >
>> > On 24 Aug 2011, at 12:12 AM, James Carman <ja...@carmanconsulting.com>
>> wrote:
>> >
>> >> Well, it looks like the camel-netty component won't work for me.  It
>> >> appears that it opens the connection for each exchange.  Am I reading
>> >> that right?  What I need is a persistent connection with automatic
>> >> reconnects.  Oh well, back to the drawing board.
>> >>
>> >> On Wed, Aug 17, 2011 at 7:59 AM, James Carman
>> >> <ja...@carmanconsulting.com> wrote:
>> >>> That's what I've been staring at! :)  Here's what I'm thinking I'm
>> >>> going to need to write.  I need an async processor that remembers the
>> >>> AsyncCallback and associates it with a correlation id.  Then, when
>> >>> another exchange comes in that has the same correlation id, it will
>> >>> lookup the previous callback and say that it's done.  I have a lot of
>> >>> questions, though.  I've never had to get so "down and dirty" with
>> >>> Camel before.  The components have just worked for me "off the shelf."
>> >>>
>> >>> 1.  Do I just copy the input message of the Exchange that comes in
>> >>> second to the output message of the originating exchange?
>> >>> 2.  How do I do a timeout for the original caller (the CXF request)?
>> >>> 3.  How do I detect that the caller has timed out if they do?
>> >>>
>> >>> I'm sure I'll have more questions, but these are the ones off the top
>> >>> of my head.
>> >>>
>> >>> On Wed, Aug 17, 2011 at 1:48 AM, Taariq Levack <ta...@gmail.com>
>> wrote:
>> >>>> James I think the rest of your puzzle is solved by Camel's async API,
>> >>>> you might have to check if your task is done, maybe your
>> >>>> requestResponse populates some collection of responses and provides
>> >>>> some API to return the response given a correlationID.
>> >>>> Stare at the async docs [1] a few more times and I'm sure you'll find
>> >>>> your answer.
>> >>>>
>> >>>> [1] http://camel.apache.org/async.html
>> >>>>
>> >>>> Taariq
>> >>>>
>> >>>> On Tue, Aug 16, 2011 at 11:16 PM, James Carman
>> >>>> <ja...@carmanconsulting.com> wrote:
>> >>>>> No worries!  Thank you for your help.  It helped me understand a bit
>> >>>>> more about how these aggregators work..  However, I still don't
>> >>>>> understand how to take care of my problem.  I guess I'm going to have
>> >>>>> to roll my own processor or something.
>> >>>>>
>> >>>>> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <ta...@gmail.com>
>> wrote:
>> >>>>>> Hmmm.
>> >>>>>> Maybe others can help with that if it's possible, I haven't had to
>> wrestle with it.
>> >>>>>>
>> >>>>>> In my case it is actually a cxf service too, but it's asynchronous
>>  and I send the response once I have it, indicating either timeout or the
>> actual response.
>> >>>>>>
>> >>>>>> Sorry I responded to your question without going back to see your
>> other posts.
>> >>>>>>
>> >>>>>> Taariq
>> >>>>>>
>> >>>>>> On 16 Aug 2011, at 10:33 PM, James Carman <
>> james@carmanconsulting.com> wrote:
>> >>>>>>
>> >>>>>>> In my case, the originating request comes from CXF.  How do I send
>> the
>> >>>>>>> aggregated response back to CXF?
>> >>>>>>>
>> >>>>>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <ta...@gmail.com>
>> wrote:
>> >>>>>>>> The consumer that handles the aggregated/timed-out request or
>> response.
>> >>>>>>>>
>> >>>>>>>> I have to resend a few times if it's the request, I simply feed it
>> back into "direct:socketRequestRoute" with the header for the number of
>> retry attempts incremented.
>> >>>>>>>> If it's the response I can forward to some process.
>> >>>>>>>>
>> >>>>>>>> Taariq
>> >>>>>>>>
>> >>>>>>>> On 16 Aug 2011, at 10:18 PM, James Carman <
>> james@carmanconsulting.com> wrote:
>> >>>>>>>>
>> >>>>>>>>> What's listening on the:
>> >>>>>>>>>
>> >>>>>>>>> to("direct:requestResponse")
>> >>>>>>>>>
>> >>>>>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <
>> taariql@gmail.com> wrote:
>> >>>>>>>>>> Sure
>> >>>>>>>>>>
>> >>>>>>>>>> You can of course solve what I've described many ways, but I'll
>> >>>>>>>>>> explain using 3 routes as that's what I used.
>> >>>>>>>>>>
>> >>>>>>>>>> This first route is the main route I mentioned earlier, so you
>> send
>> >>>>>>>>>> your socket messages here and it's multicast to both the
>> aggregator
>> >>>>>>>>>> and to the socket.
>> >>>>>>>>>>
>> >>>>>>>>>>
>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>> >>>>>>>>>>  "someOutboundSocketEndpoint");
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> This next route will aggregate, both requests and responses are
>> sent
>> >>>>>>>>>> here as you envisaged.
>> >>>>>>>>>> from("direct:requestResponseAggregator").
>> >>>>>>>>>>                .aggregate(header("someCorrellationId"),
>> >>>>>>>>>> requestResponseAggregator)
>> >>>>>>>>>>                .completionSize(2)
>> >>>>>>>>>>                .completionTimeout(5000)
>> >>>>>>>>>>                .to("direct:requestResponse"); //Here you can
>> send the
>> >>>>>>>>>> "aggregated" message, in my case it's only the response I
>> forward
>> >>>>>>>>>> unless there's a timeout, then I forward the request of course.
>> >>>>>>>>>>
>> >>>>>>>>>> Finally the route that consumes the socket responses.
>> >>>>>>>>>>
>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>> >>>>>>>>>>   //this headerEnricher doesn't have to be a processor, you have
>> many
>> >>>>>>>>>> options to add a header.
>> >>>>>>>>>>
>> >>>>>>>>>> If that's not clear feel free to ask.
>> >>>>>>>>>>
>> >>>>>>>>>> Taariq
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>> >>>>>>>>>> <ja...@carmanconsulting.com> wrote:
>> >>>>>>>>>>> Care to share an example?  I'm not picturing it.
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <
>> taariql@gmail.com> wrote:
>> >>>>>>>>>>>> Hi James
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> I did that too for what it's worth.
>> >>>>>>>>>>>> I send the message to a route that forwards to both the
>> aggregator and to the socket.
>> >>>>>>>>>>>> When the response comes in I use an enricher to add the ID to
>> the headers and then forward to the aggregator.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Taariq
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <
>> james@carmanconsulting.com> wrote:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>> Willem,
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Thank you for your help.  I don't think this is doing exactly
>> what I
>> >>>>>>>>>>>>> need, though.  The real trick here is the asynchronous nature
>> of the
>> >>>>>>>>>>>>> "server" on the other end of this situation.  I thought about
>> using an
>> >>>>>>>>>>>>> aggregator to make sure the response gets matched up with the
>> request
>> >>>>>>>>>>>>> using a correlation id.  The aggregator wouldn't aggregate
>> multiple
>> >>>>>>>>>>>>> responses together into one, it would just make sure it
>> matches the
>> >>>>>>>>>>>>> correct response with its request.  Does this sound like a
>> valid
>> >>>>>>>>>>>>> approach?  If so, how the heck do I go about it? :)
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Thanks,
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> James
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <
>> willem.jiang@gmail.com> wrote:
>> >>>>>>>>>>>>>> Hi James,
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Camel async process engine already provides the way that you
>> want.
>> >>>>>>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some
>> example.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> [1]
>> http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>> >>>>>>>>>>>>>> [2]
>> http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<
>> hzbarcea@gmail.com>
>> >>>>>>>>>>>>>>>  wrote:
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Hi James,
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> I hope I understand your scenario correctly. Here are a
>> few thoughts. I
>> >>>>>>>>>>>>>>>> assume want to use camel-netty [1] to send messages to
>> your sever (if you
>> >>>>>>>>>>>>>>>> have your own code that does that, you can use it too, but
>> you'd have to
>> >>>>>>>>>>>>>>>> write your own Processor or Component). Iiuic, your
>> scenario is converting a
>> >>>>>>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat
>> your exchange as
>> >>>>>>>>>>>>>>>> an async in-out and let your framework (Camel) decompose
>> it and compose it
>> >>>>>>>>>>>>>>>> back again. I would not keep threads blocked so I believe
>> your best bet is
>> >>>>>>>>>>>>>>>> using the Camel async messaging [2] and Futures (look at
>> the examples using
>> >>>>>>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is
>> stateless so
>> >>>>>>>>>>>>>>>> you'll need a correlationId, which you must have already
>> and something to
>> >>>>>>>>>>>>>>>> keep your state. A good bet would be jms [3], or you could
>> write your own.
>> >>>>>>>>>>>>>>>> If you used jms you would need to use both a correlationId
>> and a replyTo
>> >>>>>>>>>>>>>>>> queue.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>
>> from("jms:request-queue").to("netty:output?=correlationId");
>> >>>>>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Perhaps a bit more information might be appropriate here.
>>  Eventually,
>> >>>>>>>>>>>>>>> I'd like to "expose" this route via web services (using CXF
>> of
>> >>>>>>>>>>>>>>> course).  So, I would need to either block the request
>> thread, waiting
>> >>>>>>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0
>> asynchronous
>> >>>>>>>>>>>>>>> processing stuff (I'm thinking this might help us get more
>> done with
>> >>>>>>>>>>>>>>> less http request threads) to do more of a continuation
>> thing.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> We already have a correlation id.  The "protocol" requires
>> one and the
>> >>>>>>>>>>>>>>> server process just echos it back in the response message.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> You may have to play a bit with the correlationId and if
>> you cannot use
>> >>>>>>>>>>>>>>>> the same you can do a second transformation/correlation
>> using a claim-check
>> >>>>>>>>>>>>>>>> sort of pattern. If you don't want to use jms you can
>> implement your own (in
>> >>>>>>>>>>>>>>>> memory) persistence and correlation. You can also use a
>> resequencer [4] if
>> >>>>>>>>>>>>>>>> you want to enforce the order. If you use asyncCallback,
>> you get the replies
>> >>>>>>>>>>>>>>>> when they become available, and you can control that.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> I don't think a resequencer is necessary.  I don't want to
>> guarantee
>> >>>>>>>>>>>>>>> the ordering.  I'm mostly interested in throughput here.
>>  So, if a
>> >>>>>>>>>>>>>>> message comes in after another, but it can be processed
>> faster, so be
>> >>>>>>>>>>>>>>> it.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> It's an interesting scenario, I'll definitely give it more
>> thought, but I
>> >>>>>>>>>>>>>>>> hope this helps.
>> >>>>>>>>>>>>>>>> Hadrian
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> You have been very helpful.  Thank you for taking the time!
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> --
>> >>>>>>>>>>>>>> Willem
>> >>>>>>>>>>>>>> ----------------------------------
>> >>>>>>>>>>>>>> FuseSource
>> >>>>>>>>>>>>>> Web: http://www.fusesource.com
>> >>>>>>>>>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>> >>>>>>>>>>>>>>         http://jnn.javaeye.com (Chinese)
>> >>>>>>>>>>>>>> Twitter: willemjiang
>> >>>>>>>>>>>>>> Weibo: willemjiang
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >
>>
>

Re: Socket-based Asynchronous Calls...

Posted by Taariq Levack <ta...@gmail.com>.
I expect that the connection will only be closed if the header
NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE is true.

Glancing at the code I see what you mean, it's quite unlike MINA's producer
which checks the session to see if it's connected and reuses it, but it may
be that under the hood, yet further under the hood, hehe, way further down
into netty's ClientBootstrap and beyond, the connection is being reused. I
don't know for sure.

This is from Netty front-page, "True connectionless datagram socket support
(since 3.1)":
And glancing at that bit elsewhere I think it's possible to do without this
sort of plumbing, but you'd have to jump into netty code or docs to confirm.

Depending on timing and other factors I would go ahead with a POC because it
either works or it will work, a failing test from your POC will be most
welcome.

Taariq


On Wed, Aug 24, 2011 at 12:43 PM, James Carman
<ja...@carmanconsulting.com>wrote:

> I have read the source:
>
>
> http://svn.apache.org/repos/asf/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
>
> Take a look at the process() method.  In there, there is a block of
> code that does:
>
>        ChannelFuture channelFuture;
>        final Channel channel;
>        try {
>            channelFuture = openConnection(exchange, callback);
>            channel = openChannel(channelFuture);
>        } catch (Exception e) {
>            exchange.setException(e);
>            callback.done(true);
>            return true;
>        }
>
>
> This is not inside an if block or anything and the openConnection()
> method does actually open it, it isn't just returning a
> previously-opened connection or anything.
>
> Perhaps I'm missing something (entirely possible), but it appears that
> it's opening the connection every time the process() method is called.
>
> On Tue, Aug 23, 2011 at 11:09 PM, Taariq Levack <ta...@gmail.com> wrote:
> > That doesn't sound right, what have you read? Logs/docs?
> > And are you using keep-alive?
> >
> > Taariq
> >
> >
> > On 24 Aug 2011, at 12:12 AM, James Carman <ja...@carmanconsulting.com>
> wrote:
> >
> >> Well, it looks like the camel-netty component won't work for me.  It
> >> appears that it opens the connection for each exchange.  Am I reading
> >> that right?  What I need is a persistent connection with automatic
> >> reconnects.  Oh well, back to the drawing board.
> >>
> >> On Wed, Aug 17, 2011 at 7:59 AM, James Carman
> >> <ja...@carmanconsulting.com> wrote:
> >>> That's what I've been staring at! :)  Here's what I'm thinking I'm
> >>> going to need to write.  I need an async processor that remembers the
> >>> AsyncCallback and associates it with a correlation id.  Then, when
> >>> another exchange comes in that has the same correlation id, it will
> >>> lookup the previous callback and say that it's done.  I have a lot of
> >>> questions, though.  I've never had to get so "down and dirty" with
> >>> Camel before.  The components have just worked for me "off the shelf."
> >>>
> >>> 1.  Do I just copy the input message of the Exchange that comes in
> >>> second to the output message of the originating exchange?
> >>> 2.  How do I do a timeout for the original caller (the CXF request)?
> >>> 3.  How do I detect that the caller has timed out if they do?
> >>>
> >>> I'm sure I'll have more questions, but these are the ones off the top
> >>> of my head.
> >>>
> >>> On Wed, Aug 17, 2011 at 1:48 AM, Taariq Levack <ta...@gmail.com>
> wrote:
> >>>> James I think the rest of your puzzle is solved by Camel's async API,
> >>>> you might have to check if your task is done, maybe your
> >>>> requestResponse populates some collection of responses and provides
> >>>> some API to return the response given a correlationID.
> >>>> Stare at the async docs [1] a few more times and I'm sure you'll find
> >>>> your answer.
> >>>>
> >>>> [1] http://camel.apache.org/async.html
> >>>>
> >>>> Taariq
> >>>>
> >>>> On Tue, Aug 16, 2011 at 11:16 PM, James Carman
> >>>> <ja...@carmanconsulting.com> wrote:
> >>>>> No worries!  Thank you for your help.  It helped me understand a bit
> >>>>> more about how these aggregators work..  However, I still don't
> >>>>> understand how to take care of my problem.  I guess I'm going to have
> >>>>> to roll my own processor or something.
> >>>>>
> >>>>> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <ta...@gmail.com>
> wrote:
> >>>>>> Hmmm.
> >>>>>> Maybe others can help with that if it's possible, I haven't had to
> wrestle with it.
> >>>>>>
> >>>>>> In my case it is actually a cxf service too, but it's asynchronous
>  and I send the response once I have it, indicating either timeout or the
> actual response.
> >>>>>>
> >>>>>> Sorry I responded to your question without going back to see your
> other posts.
> >>>>>>
> >>>>>> Taariq
> >>>>>>
> >>>>>> On 16 Aug 2011, at 10:33 PM, James Carman <
> james@carmanconsulting.com> wrote:
> >>>>>>
> >>>>>>> In my case, the originating request comes from CXF.  How do I send
> the
> >>>>>>> aggregated response back to CXF?
> >>>>>>>
> >>>>>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <ta...@gmail.com>
> wrote:
> >>>>>>>> The consumer that handles the aggregated/timed-out request or
> response.
> >>>>>>>>
> >>>>>>>> I have to resend a few times if it's the request, I simply feed it
> back into "direct:socketRequestRoute" with the header for the number of
> retry attempts incremented.
> >>>>>>>> If it's the response I can forward to some process.
> >>>>>>>>
> >>>>>>>> Taariq
> >>>>>>>>
> >>>>>>>> On 16 Aug 2011, at 10:18 PM, James Carman <
> james@carmanconsulting.com> wrote:
> >>>>>>>>
> >>>>>>>>> What's listening on the:
> >>>>>>>>>
> >>>>>>>>> to("direct:requestResponse")
> >>>>>>>>>
> >>>>>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <
> taariql@gmail.com> wrote:
> >>>>>>>>>> Sure
> >>>>>>>>>>
> >>>>>>>>>> You can of course solve what I've described many ways, but I'll
> >>>>>>>>>> explain using 3 routes as that's what I used.
> >>>>>>>>>>
> >>>>>>>>>> This first route is the main route I mentioned earlier, so you
> send
> >>>>>>>>>> your socket messages here and it's multicast to both the
> aggregator
> >>>>>>>>>> and to the socket.
> >>>>>>>>>>
> >>>>>>>>>>
> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
> >>>>>>>>>>  "someOutboundSocketEndpoint");
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> This next route will aggregate, both requests and responses are
> sent
> >>>>>>>>>> here as you envisaged.
> >>>>>>>>>> from("direct:requestResponseAggregator").
> >>>>>>>>>>                .aggregate(header("someCorrellationId"),
> >>>>>>>>>> requestResponseAggregator)
> >>>>>>>>>>                .completionSize(2)
> >>>>>>>>>>                .completionTimeout(5000)
> >>>>>>>>>>                .to("direct:requestResponse"); //Here you can
> send the
> >>>>>>>>>> "aggregated" message, in my case it's only the response I
> forward
> >>>>>>>>>> unless there's a timeout, then I forward the request of course.
> >>>>>>>>>>
> >>>>>>>>>> Finally the route that consumes the socket responses.
> >>>>>>>>>>
> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
> >>>>>>>>>>   //this headerEnricher doesn't have to be a processor, you have
> many
> >>>>>>>>>> options to add a header.
> >>>>>>>>>>
> >>>>>>>>>> If that's not clear feel free to ask.
> >>>>>>>>>>
> >>>>>>>>>> Taariq
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
> >>>>>>>>>> <ja...@carmanconsulting.com> wrote:
> >>>>>>>>>>> Care to share an example?  I'm not picturing it.
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <
> taariql@gmail.com> wrote:
> >>>>>>>>>>>> Hi James
> >>>>>>>>>>>>
> >>>>>>>>>>>> I did that too for what it's worth.
> >>>>>>>>>>>> I send the message to a route that forwards to both the
> aggregator and to the socket.
> >>>>>>>>>>>> When the response comes in I use an enricher to add the ID to
> the headers and then forward to the aggregator.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Taariq
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <
> james@carmanconsulting.com> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Willem,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thank you for your help.  I don't think this is doing exactly
> what I
> >>>>>>>>>>>>> need, though.  The real trick here is the asynchronous nature
> of the
> >>>>>>>>>>>>> "server" on the other end of this situation.  I thought about
> using an
> >>>>>>>>>>>>> aggregator to make sure the response gets matched up with the
> request
> >>>>>>>>>>>>> using a correlation id.  The aggregator wouldn't aggregate
> multiple
> >>>>>>>>>>>>> responses together into one, it would just make sure it
> matches the
> >>>>>>>>>>>>> correct response with its request.  Does this sound like a
> valid
> >>>>>>>>>>>>> approach?  If so, how the heck do I go about it? :)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> James
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <
> willem.jiang@gmail.com> wrote:
> >>>>>>>>>>>>>> Hi James,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Camel async process engine already provides the way that you
> want.
> >>>>>>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some
> example.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> [1]
> http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
> >>>>>>>>>>>>>> [2]
> http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<
> hzbarcea@gmail.com>
> >>>>>>>>>>>>>>>  wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi James,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I hope I understand your scenario correctly. Here are a
> few thoughts. I
> >>>>>>>>>>>>>>>> assume want to use camel-netty [1] to send messages to
> your sever (if you
> >>>>>>>>>>>>>>>> have your own code that does that, you can use it too, but
> you'd have to
> >>>>>>>>>>>>>>>> write your own Processor or Component). Iiuic, your
> scenario is converting a
> >>>>>>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat
> your exchange as
> >>>>>>>>>>>>>>>> an async in-out and let your framework (Camel) decompose
> it and compose it
> >>>>>>>>>>>>>>>> back again. I would not keep threads blocked so I believe
> your best bet is
> >>>>>>>>>>>>>>>> using the Camel async messaging [2] and Futures (look at
> the examples using
> >>>>>>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is
> stateless so
> >>>>>>>>>>>>>>>> you'll need a correlationId, which you must have already
> and something to
> >>>>>>>>>>>>>>>> keep your state. A good bet would be jms [3], or you could
> write your own.
> >>>>>>>>>>>>>>>> If you used jms you would need to use both a correlationId
> and a replyTo
> >>>>>>>>>>>>>>>> queue.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> from("jms:request-queue").to("netty:output?=correlationId");
> >>>>>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Perhaps a bit more information might be appropriate here.
>  Eventually,
> >>>>>>>>>>>>>>> I'd like to "expose" this route via web services (using CXF
> of
> >>>>>>>>>>>>>>> course).  So, I would need to either block the request
> thread, waiting
> >>>>>>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0
> asynchronous
> >>>>>>>>>>>>>>> processing stuff (I'm thinking this might help us get more
> done with
> >>>>>>>>>>>>>>> less http request threads) to do more of a continuation
> thing.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> We already have a correlation id.  The "protocol" requires
> one and the
> >>>>>>>>>>>>>>> server process just echos it back in the response message.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> You may have to play a bit with the correlationId and if
> you cannot use
> >>>>>>>>>>>>>>>> the same you can do a second transformation/correlation
> using a claim-check
> >>>>>>>>>>>>>>>> sort of pattern. If you don't want to use jms you can
> implement your own (in
> >>>>>>>>>>>>>>>> memory) persistence and correlation. You can also use a
> resequencer [4] if
> >>>>>>>>>>>>>>>> you want to enforce the order. If you use asyncCallback,
> you get the replies
> >>>>>>>>>>>>>>>> when they become available, and you can control that.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I don't think a resequencer is necessary.  I don't want to
> guarantee
> >>>>>>>>>>>>>>> the ordering.  I'm mostly interested in throughput here.
>  So, if a
> >>>>>>>>>>>>>>> message comes in after another, but it can be processed
> faster, so be
> >>>>>>>>>>>>>>> it.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> It's an interesting scenario, I'll definitely give it more
> thought, but I
> >>>>>>>>>>>>>>>> hope this helps.
> >>>>>>>>>>>>>>>> Hadrian
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> You have been very helpful.  Thank you for taking the time!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> --
> >>>>>>>>>>>>>> Willem
> >>>>>>>>>>>>>> ----------------------------------
> >>>>>>>>>>>>>> FuseSource
> >>>>>>>>>>>>>> Web: http://www.fusesource.com
> >>>>>>>>>>>>>> Blog:    http://willemjiang.blogspot.com (English)
> >>>>>>>>>>>>>>         http://jnn.javaeye.com (Chinese)
> >>>>>>>>>>>>>> Twitter: willemjiang
> >>>>>>>>>>>>>> Weibo: willemjiang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >
>

Re: Socket-based Asynchronous Calls...

Posted by James Carman <ja...@carmanconsulting.com>.
I have read the source:

http://svn.apache.org/repos/asf/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java

Take a look at the process() method.  In there, there is a block of
code that does:

        ChannelFuture channelFuture;
        final Channel channel;
        try {
            channelFuture = openConnection(exchange, callback);
            channel = openChannel(channelFuture);
        } catch (Exception e) {
            exchange.setException(e);
            callback.done(true);
            return true;
        }


This is not inside an if block or anything and the openConnection()
method does actually open it, it isn't just returning a
previously-opened connection or anything.

Perhaps I'm missing something (entirely possible), but it appears that
it's opening the connection every time the process() method is called.

On Tue, Aug 23, 2011 at 11:09 PM, Taariq Levack <ta...@gmail.com> wrote:
> That doesn't sound right, what have you read? Logs/docs?
> And are you using keep-alive?
>
> Taariq
>
>
> On 24 Aug 2011, at 12:12 AM, James Carman <ja...@carmanconsulting.com> wrote:
>
>> Well, it looks like the camel-netty component won't work for me.  It
>> appears that it opens the connection for each exchange.  Am I reading
>> that right?  What I need is a persistent connection with automatic
>> reconnects.  Oh well, back to the drawing board.
>>
>> On Wed, Aug 17, 2011 at 7:59 AM, James Carman
>> <ja...@carmanconsulting.com> wrote:
>>> That's what I've been staring at! :)  Here's what I'm thinking I'm
>>> going to need to write.  I need an async processor that remembers the
>>> AsyncCallback and associates it with a correlation id.  Then, when
>>> another exchange comes in that has the same correlation id, it will
>>> lookup the previous callback and say that it's done.  I have a lot of
>>> questions, though.  I've never had to get so "down and dirty" with
>>> Camel before.  The components have just worked for me "off the shelf."
>>>
>>> 1.  Do I just copy the input message of the Exchange that comes in
>>> second to the output message of the originating exchange?
>>> 2.  How do I do a timeout for the original caller (the CXF request)?
>>> 3.  How do I detect that the caller has timed out if they do?
>>>
>>> I'm sure I'll have more questions, but these are the ones off the top
>>> of my head.
>>>
>>> On Wed, Aug 17, 2011 at 1:48 AM, Taariq Levack <ta...@gmail.com> wrote:
>>>> James I think the rest of your puzzle is solved by Camel's async API,
>>>> you might have to check if your task is done, maybe your
>>>> requestResponse populates some collection of responses and provides
>>>> some API to return the response given a correlationID.
>>>> Stare at the async docs [1] a few more times and I'm sure you'll find
>>>> your answer.
>>>>
>>>> [1] http://camel.apache.org/async.html
>>>>
>>>> Taariq
>>>>
>>>> On Tue, Aug 16, 2011 at 11:16 PM, James Carman
>>>> <ja...@carmanconsulting.com> wrote:
>>>>> No worries!  Thank you for your help.  It helped me understand a bit
>>>>> more about how these aggregators work..  However, I still don't
>>>>> understand how to take care of my problem.  I guess I'm going to have
>>>>> to roll my own processor or something.
>>>>>
>>>>> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>>> Hmmm.
>>>>>> Maybe others can help with that if it's possible, I haven't had to wrestle with it.
>>>>>>
>>>>>> In my case it is actually a cxf service too, but it's asynchronous  and I send the response once I have it, indicating either timeout or the actual response.
>>>>>>
>>>>>> Sorry I responded to your question without going back to see your other posts.
>>>>>>
>>>>>> Taariq
>>>>>>
>>>>>> On 16 Aug 2011, at 10:33 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>>>>
>>>>>>> In my case, the originating request comes from CXF.  How do I send the
>>>>>>> aggregated response back to CXF?
>>>>>>>
>>>>>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>>>>> The consumer that handles the aggregated/timed-out request or response.
>>>>>>>>
>>>>>>>> I have to resend a few times if it's the request, I simply feed it back into "direct:socketRequestRoute" with the header for the number of retry attempts incremented.
>>>>>>>> If it's the response I can forward to some process.
>>>>>>>>
>>>>>>>> Taariq
>>>>>>>>
>>>>>>>> On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>>>>>>
>>>>>>>>> What's listening on the:
>>>>>>>>>
>>>>>>>>> to("direct:requestResponse")
>>>>>>>>>
>>>>>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>>>>>>> Sure
>>>>>>>>>>
>>>>>>>>>> You can of course solve what I've described many ways, but I'll
>>>>>>>>>> explain using 3 routes as that's what I used.
>>>>>>>>>>
>>>>>>>>>> This first route is the main route I mentioned earlier, so you send
>>>>>>>>>> your socket messages here and it's multicast to both the aggregator
>>>>>>>>>> and to the socket.
>>>>>>>>>>
>>>>>>>>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>>>>>>>>>  "someOutboundSocketEndpoint");
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> This next route will aggregate, both requests and responses are sent
>>>>>>>>>> here as you envisaged.
>>>>>>>>>> from("direct:requestResponseAggregator").
>>>>>>>>>>                .aggregate(header("someCorrellationId"),
>>>>>>>>>> requestResponseAggregator)
>>>>>>>>>>                .completionSize(2)
>>>>>>>>>>                .completionTimeout(5000)
>>>>>>>>>>                .to("direct:requestResponse"); //Here you can send the
>>>>>>>>>> "aggregated" message, in my case it's only the response I forward
>>>>>>>>>> unless there's a timeout, then I forward the request of course.
>>>>>>>>>>
>>>>>>>>>> Finally the route that consumes the socket responses.
>>>>>>>>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>>>>>>>>>   //this headerEnricher doesn't have to be a processor, you have many
>>>>>>>>>> options to add a header.
>>>>>>>>>>
>>>>>>>>>> If that's not clear feel free to ask.
>>>>>>>>>>
>>>>>>>>>> Taariq
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>>>>>>>>> <ja...@carmanconsulting.com> wrote:
>>>>>>>>>>> Care to share an example?  I'm not picturing it.
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>>>>>>>>> Hi James
>>>>>>>>>>>>
>>>>>>>>>>>> I did that too for what it's worth.
>>>>>>>>>>>> I send the message to a route that forwards to both the aggregator and to the socket.
>>>>>>>>>>>> When the response comes in I use an enricher to add the ID to the headers and then forward to the aggregator.
>>>>>>>>>>>>
>>>>>>>>>>>> Taariq
>>>>>>>>>>>>
>>>>>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Willem,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thank you for your help.  I don't think this is doing exactly what I
>>>>>>>>>>>>> need, though.  The real trick here is the asynchronous nature of the
>>>>>>>>>>>>> "server" on the other end of this situation.  I thought about using an
>>>>>>>>>>>>> aggregator to make sure the response gets matched up with the request
>>>>>>>>>>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>>>>>>>>>>> responses together into one, it would just make sure it matches the
>>>>>>>>>>>>> correct response with its request.  Does this sound like a valid
>>>>>>>>>>>>> approach?  If so, how the heck do I go about it? :)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>
>>>>>>>>>>>>> James
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <wi...@gmail.com> wrote:
>>>>>>>>>>>>>> Hi James,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Camel async process engine already provides the way that you want.
>>>>>>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>>>>>>>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hz...@gmail.com>
>>>>>>>>>>>>>>>  wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi James,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I hope I understand your scenario correctly. Here are a few thoughts. I
>>>>>>>>>>>>>>>> assume want to use camel-netty [1] to send messages to your sever (if you
>>>>>>>>>>>>>>>> have your own code that does that, you can use it too, but you'd have to
>>>>>>>>>>>>>>>> write your own Processor or Component). Iiuic, your scenario is converting a
>>>>>>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your exchange as
>>>>>>>>>>>>>>>> an async in-out and let your framework (Camel) decompose it and compose it
>>>>>>>>>>>>>>>> back again. I would not keep threads blocked so I believe your best bet is
>>>>>>>>>>>>>>>> using the Camel async messaging [2] and Futures (look at the examples using
>>>>>>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so
>>>>>>>>>>>>>>>> you'll need a correlationId, which you must have already and something to
>>>>>>>>>>>>>>>> keep your state. A good bet would be jms [3], or you could write your own.
>>>>>>>>>>>>>>>> If you used jms you would need to use both a correlationId and a replyTo
>>>>>>>>>>>>>>>> queue.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>>>>>>>>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>>>>>>>>>>> course).  So, I would need to either block the request thread, waiting
>>>>>>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>>>>>>>>>>> processing stuff (I'm thinking this might help us get more done with
>>>>>>>>>>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We already have a correlation id.  The "protocol" requires one and the
>>>>>>>>>>>>>>> server process just echos it back in the response message.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> You may have to play a bit with the correlationId and if you cannot use
>>>>>>>>>>>>>>>> the same you can do a second transformation/correlation using a claim-check
>>>>>>>>>>>>>>>> sort of pattern. If you don't want to use jms you can implement your own (in
>>>>>>>>>>>>>>>> memory) persistence and correlation. You can also use a resequencer [4] if
>>>>>>>>>>>>>>>> you want to enforce the order. If you use asyncCallback, you get the replies
>>>>>>>>>>>>>>>> when they become available, and you can control that.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>>>>>>>>>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>>>>>>>>>>>>> message comes in after another, but it can be processed faster, so be
>>>>>>>>>>>>>>> it.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> It's an interesting scenario, I'll definitely give it more thought, but I
>>>>>>>>>>>>>>>> hope this helps.
>>>>>>>>>>>>>>>> Hadrian
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Willem
>>>>>>>>>>>>>> ----------------------------------
>>>>>>>>>>>>>> FuseSource
>>>>>>>>>>>>>> Web: http://www.fusesource.com
>>>>>>>>>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>>>>>>>>>>>         http://jnn.javaeye.com (Chinese)
>>>>>>>>>>>>>> Twitter: willemjiang
>>>>>>>>>>>>>> Weibo: willemjiang
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Re: Socket-based Asynchronous Calls...

Posted by Taariq Levack <ta...@gmail.com>.
That doesn't sound right, what have you read? Logs/docs?
And are you using keep-alive?

Taariq


On 24 Aug 2011, at 12:12 AM, James Carman <ja...@carmanconsulting.com> wrote:

> Well, it looks like the camel-netty component won't work for me.  It
> appears that it opens the connection for each exchange.  Am I reading
> that right?  What I need is a persistent connection with automatic
> reconnects.  Oh well, back to the drawing board.
> 
> On Wed, Aug 17, 2011 at 7:59 AM, James Carman
> <ja...@carmanconsulting.com> wrote:
>> That's what I've been staring at! :)  Here's what I'm thinking I'm
>> going to need to write.  I need an async processor that remembers the
>> AsyncCallback and associates it with a correlation id.  Then, when
>> another exchange comes in that has the same correlation id, it will
>> lookup the previous callback and say that it's done.  I have a lot of
>> questions, though.  I've never had to get so "down and dirty" with
>> Camel before.  The components have just worked for me "off the shelf."
>> 
>> 1.  Do I just copy the input message of the Exchange that comes in
>> second to the output message of the originating exchange?
>> 2.  How do I do a timeout for the original caller (the CXF request)?
>> 3.  How do I detect that the caller has timed out if they do?
>> 
>> I'm sure I'll have more questions, but these are the ones off the top
>> of my head.
>> 
>> On Wed, Aug 17, 2011 at 1:48 AM, Taariq Levack <ta...@gmail.com> wrote:
>>> James I think the rest of your puzzle is solved by Camel's async API,
>>> you might have to check if your task is done, maybe your
>>> requestResponse populates some collection of responses and provides
>>> some API to return the response given a correlationID.
>>> Stare at the async docs [1] a few more times and I'm sure you'll find
>>> your answer.
>>> 
>>> [1] http://camel.apache.org/async.html
>>> 
>>> Taariq
>>> 
>>> On Tue, Aug 16, 2011 at 11:16 PM, James Carman
>>> <ja...@carmanconsulting.com> wrote:
>>>> No worries!  Thank you for your help.  It helped me understand a bit
>>>> more about how these aggregators work..  However, I still don't
>>>> understand how to take care of my problem.  I guess I'm going to have
>>>> to roll my own processor or something.
>>>> 
>>>> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>> Hmmm.
>>>>> Maybe others can help with that if it's possible, I haven't had to wrestle with it.
>>>>> 
>>>>> In my case it is actually a cxf service too, but it's asynchronous  and I send the response once I have it, indicating either timeout or the actual response.
>>>>> 
>>>>> Sorry I responded to your question without going back to see your other posts.
>>>>> 
>>>>> Taariq
>>>>> 
>>>>> On 16 Aug 2011, at 10:33 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>>> 
>>>>>> In my case, the originating request comes from CXF.  How do I send the
>>>>>> aggregated response back to CXF?
>>>>>> 
>>>>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>>>> The consumer that handles the aggregated/timed-out request or response.
>>>>>>> 
>>>>>>> I have to resend a few times if it's the request, I simply feed it back into "direct:socketRequestRoute" with the header for the number of retry attempts incremented.
>>>>>>> If it's the response I can forward to some process.
>>>>>>> 
>>>>>>> Taariq
>>>>>>> 
>>>>>>> On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>>>>> 
>>>>>>>> What's listening on the:
>>>>>>>> 
>>>>>>>> to("direct:requestResponse")
>>>>>>>> 
>>>>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>>>>>> Sure
>>>>>>>>> 
>>>>>>>>> You can of course solve what I've described many ways, but I'll
>>>>>>>>> explain using 3 routes as that's what I used.
>>>>>>>>> 
>>>>>>>>> This first route is the main route I mentioned earlier, so you send
>>>>>>>>> your socket messages here and it's multicast to both the aggregator
>>>>>>>>> and to the socket.
>>>>>>>>> 
>>>>>>>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>>>>>>>>  "someOutboundSocketEndpoint");
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> This next route will aggregate, both requests and responses are sent
>>>>>>>>> here as you envisaged.
>>>>>>>>> from("direct:requestResponseAggregator").
>>>>>>>>>                .aggregate(header("someCorrellationId"),
>>>>>>>>> requestResponseAggregator)
>>>>>>>>>                .completionSize(2)
>>>>>>>>>                .completionTimeout(5000)
>>>>>>>>>                .to("direct:requestResponse"); //Here you can send the
>>>>>>>>> "aggregated" message, in my case it's only the response I forward
>>>>>>>>> unless there's a timeout, then I forward the request of course.
>>>>>>>>> 
>>>>>>>>> Finally the route that consumes the socket responses.
>>>>>>>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>>>>>>>>   //this headerEnricher doesn't have to be a processor, you have many
>>>>>>>>> options to add a header.
>>>>>>>>> 
>>>>>>>>> If that's not clear feel free to ask.
>>>>>>>>> 
>>>>>>>>> Taariq
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>>>>>>>> <ja...@carmanconsulting.com> wrote:
>>>>>>>>>> Care to share an example?  I'm not picturing it.
>>>>>>>>>> 
>>>>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>>>>>>>> Hi James
>>>>>>>>>>> 
>>>>>>>>>>> I did that too for what it's worth.
>>>>>>>>>>> I send the message to a route that forwards to both the aggregator and to the socket.
>>>>>>>>>>> When the response comes in I use an enricher to add the ID to the headers and then forward to the aggregator.
>>>>>>>>>>> 
>>>>>>>>>>> Taariq
>>>>>>>>>>> 
>>>>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Willem,
>>>>>>>>>>>> 
>>>>>>>>>>>> Thank you for your help.  I don't think this is doing exactly what I
>>>>>>>>>>>> need, though.  The real trick here is the asynchronous nature of the
>>>>>>>>>>>> "server" on the other end of this situation.  I thought about using an
>>>>>>>>>>>> aggregator to make sure the response gets matched up with the request
>>>>>>>>>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>>>>>>>>>> responses together into one, it would just make sure it matches the
>>>>>>>>>>>> correct response with its request.  Does this sound like a valid
>>>>>>>>>>>> approach?  If so, how the heck do I go about it? :)
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> 
>>>>>>>>>>>> James
>>>>>>>>>>>> 
>>>>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <wi...@gmail.com> wrote:
>>>>>>>>>>>>> Hi James,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Camel async process engine already provides the way that you want.
>>>>>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>>>>>>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hz...@gmail.com>
>>>>>>>>>>>>>>  wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi James,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I hope I understand your scenario correctly. Here are a few thoughts. I
>>>>>>>>>>>>>>> assume want to use camel-netty [1] to send messages to your sever (if you
>>>>>>>>>>>>>>> have your own code that does that, you can use it too, but you'd have to
>>>>>>>>>>>>>>> write your own Processor or Component). Iiuic, your scenario is converting a
>>>>>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your exchange as
>>>>>>>>>>>>>>> an async in-out and let your framework (Camel) decompose it and compose it
>>>>>>>>>>>>>>> back again. I would not keep threads blocked so I believe your best bet is
>>>>>>>>>>>>>>> using the Camel async messaging [2] and Futures (look at the examples using
>>>>>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so
>>>>>>>>>>>>>>> you'll need a correlationId, which you must have already and something to
>>>>>>>>>>>>>>> keep your state. A good bet would be jms [3], or you could write your own.
>>>>>>>>>>>>>>> If you used jms you would need to use both a correlationId and a replyTo
>>>>>>>>>>>>>>> queue.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>>>>>>>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>>>>>>>>>> course).  So, I would need to either block the request thread, waiting
>>>>>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>>>>>>>>>> processing stuff (I'm thinking this might help us get more done with
>>>>>>>>>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> We already have a correlation id.  The "protocol" requires one and the
>>>>>>>>>>>>>> server process just echos it back in the response message.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> You may have to play a bit with the correlationId and if you cannot use
>>>>>>>>>>>>>>> the same you can do a second transformation/correlation using a claim-check
>>>>>>>>>>>>>>> sort of pattern. If you don't want to use jms you can implement your own (in
>>>>>>>>>>>>>>> memory) persistence and correlation. You can also use a resequencer [4] if
>>>>>>>>>>>>>>> you want to enforce the order. If you use asyncCallback, you get the replies
>>>>>>>>>>>>>>> when they become available, and you can control that.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>>>>>>>>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>>>>>>>>>>>> message comes in after another, but it can be processed faster, so be
>>>>>>>>>>>>>> it.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> It's an interesting scenario, I'll definitely give it more thought, but I
>>>>>>>>>>>>>>> hope this helps.
>>>>>>>>>>>>>>> Hadrian
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Willem
>>>>>>>>>>>>> ----------------------------------
>>>>>>>>>>>>> FuseSource
>>>>>>>>>>>>> Web: http://www.fusesource.com
>>>>>>>>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>>>>>>>>>>         http://jnn.javaeye.com (Chinese)
>>>>>>>>>>>>> Twitter: willemjiang
>>>>>>>>>>>>> Weibo: willemjiang
>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Re: Socket-based Asynchronous Calls...

Posted by James Carman <ja...@carmanconsulting.com>.
Well, it looks like the camel-netty component won't work for me.  It
appears that it opens the connection for each exchange.  Am I reading
that right?  What I need is a persistent connection with automatic
reconnects.  Oh well, back to the drawing board.

On Wed, Aug 17, 2011 at 7:59 AM, James Carman
<ja...@carmanconsulting.com> wrote:
> That's what I've been staring at! :)  Here's what I'm thinking I'm
> going to need to write.  I need an async processor that remembers the
> AsyncCallback and associates it with a correlation id.  Then, when
> another exchange comes in that has the same correlation id, it will
> lookup the previous callback and say that it's done.  I have a lot of
> questions, though.  I've never had to get so "down and dirty" with
> Camel before.  The components have just worked for me "off the shelf."
>
> 1.  Do I just copy the input message of the Exchange that comes in
> second to the output message of the originating exchange?
> 2.  How do I do a timeout for the original caller (the CXF request)?
> 3.  How do I detect that the caller has timed out if they do?
>
> I'm sure I'll have more questions, but these are the ones off the top
> of my head.
>
> On Wed, Aug 17, 2011 at 1:48 AM, Taariq Levack <ta...@gmail.com> wrote:
>> James I think the rest of your puzzle is solved by Camel's async API,
>> you might have to check if your task is done, maybe your
>> requestResponse populates some collection of responses and provides
>> some API to return the response given a correlationID.
>> Stare at the async docs [1] a few more times and I'm sure you'll find
>> your answer.
>>
>> [1] http://camel.apache.org/async.html
>>
>> Taariq
>>
>> On Tue, Aug 16, 2011 at 11:16 PM, James Carman
>> <ja...@carmanconsulting.com> wrote:
>>> No worries!  Thank you for your help.  It helped me understand a bit
>>> more about how these aggregators work..  However, I still don't
>>> understand how to take care of my problem.  I guess I'm going to have
>>> to roll my own processor or something.
>>>
>>> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>> Hmmm.
>>>> Maybe others can help with that if it's possible, I haven't had to wrestle with it.
>>>>
>>>> In my case it is actually a cxf service too, but it's asynchronous  and I send the response once I have it, indicating either timeout or the actual response.
>>>>
>>>> Sorry I responded to your question without going back to see your other posts.
>>>>
>>>> Taariq
>>>>
>>>> On 16 Aug 2011, at 10:33 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>>
>>>>> In my case, the originating request comes from CXF.  How do I send the
>>>>> aggregated response back to CXF?
>>>>>
>>>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>>> The consumer that handles the aggregated/timed-out request or response.
>>>>>>
>>>>>> I have to resend a few times if it's the request, I simply feed it back into "direct:socketRequestRoute" with the header for the number of retry attempts incremented.
>>>>>> If it's the response I can forward to some process.
>>>>>>
>>>>>> Taariq
>>>>>>
>>>>>> On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>>>>
>>>>>>> What's listening on the:
>>>>>>>
>>>>>>> to("direct:requestResponse")
>>>>>>>
>>>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>>>>> Sure
>>>>>>>>
>>>>>>>> You can of course solve what I've described many ways, but I'll
>>>>>>>> explain using 3 routes as that's what I used.
>>>>>>>>
>>>>>>>> This first route is the main route I mentioned earlier, so you send
>>>>>>>> your socket messages here and it's multicast to both the aggregator
>>>>>>>> and to the socket.
>>>>>>>>
>>>>>>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>>>>>>>  "someOutboundSocketEndpoint");
>>>>>>>>
>>>>>>>>
>>>>>>>> This next route will aggregate, both requests and responses are sent
>>>>>>>> here as you envisaged.
>>>>>>>> from("direct:requestResponseAggregator").
>>>>>>>>                .aggregate(header("someCorrellationId"),
>>>>>>>> requestResponseAggregator)
>>>>>>>>                .completionSize(2)
>>>>>>>>                .completionTimeout(5000)
>>>>>>>>                .to("direct:requestResponse"); //Here you can send the
>>>>>>>> "aggregated" message, in my case it's only the response I forward
>>>>>>>> unless there's a timeout, then I forward the request of course.
>>>>>>>>
>>>>>>>> Finally the route that consumes the socket responses.
>>>>>>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>>>>>>>   //this headerEnricher doesn't have to be a processor, you have many
>>>>>>>> options to add a header.
>>>>>>>>
>>>>>>>> If that's not clear feel free to ask.
>>>>>>>>
>>>>>>>> Taariq
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>>>>>>> <ja...@carmanconsulting.com> wrote:
>>>>>>>>> Care to share an example?  I'm not picturing it.
>>>>>>>>>
>>>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>>>>>>> Hi James
>>>>>>>>>>
>>>>>>>>>> I did that too for what it's worth.
>>>>>>>>>> I send the message to a route that forwards to both the aggregator and to the socket.
>>>>>>>>>> When the response comes in I use an enricher to add the ID to the headers and then forward to the aggregator.
>>>>>>>>>>
>>>>>>>>>> Taariq
>>>>>>>>>>
>>>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Willem,
>>>>>>>>>>>
>>>>>>>>>>> Thank you for your help.  I don't think this is doing exactly what I
>>>>>>>>>>> need, though.  The real trick here is the asynchronous nature of the
>>>>>>>>>>> "server" on the other end of this situation.  I thought about using an
>>>>>>>>>>> aggregator to make sure the response gets matched up with the request
>>>>>>>>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>>>>>>>>> responses together into one, it would just make sure it matches the
>>>>>>>>>>> correct response with its request.  Does this sound like a valid
>>>>>>>>>>> approach?  If so, how the heck do I go about it? :)
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>> James
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <wi...@gmail.com> wrote:
>>>>>>>>>>>> Hi James,
>>>>>>>>>>>>
>>>>>>>>>>>> Camel async process engine already provides the way that you want.
>>>>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>>>>>>>>
>>>>>>>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>>>>>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>>>>>>>>>
>>>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hz...@gmail.com>
>>>>>>>>>>>>>  wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi James,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I hope I understand your scenario correctly. Here are a few thoughts. I
>>>>>>>>>>>>>> assume want to use camel-netty [1] to send messages to your sever (if you
>>>>>>>>>>>>>> have your own code that does that, you can use it too, but you'd have to
>>>>>>>>>>>>>> write your own Processor or Component). Iiuic, your scenario is converting a
>>>>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your exchange as
>>>>>>>>>>>>>> an async in-out and let your framework (Camel) decompose it and compose it
>>>>>>>>>>>>>> back again. I would not keep threads blocked so I believe your best bet is
>>>>>>>>>>>>>> using the Camel async messaging [2] and Futures (look at the examples using
>>>>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so
>>>>>>>>>>>>>> you'll need a correlationId, which you must have already and something to
>>>>>>>>>>>>>> keep your state. A good bet would be jms [3], or you could write your own.
>>>>>>>>>>>>>> If you used jms you would need to use both a correlationId and a replyTo
>>>>>>>>>>>>>> queue.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>>>>>>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>>>>>>>>> course).  So, I would need to either block the request thread, waiting
>>>>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>>>>>>>>> processing stuff (I'm thinking this might help us get more done with
>>>>>>>>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>>>>>>>>
>>>>>>>>>>>>> We already have a correlation id.  The "protocol" requires one and the
>>>>>>>>>>>>> server process just echos it back in the response message.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> You may have to play a bit with the correlationId and if you cannot use
>>>>>>>>>>>>>> the same you can do a second transformation/correlation using a claim-check
>>>>>>>>>>>>>> sort of pattern. If you don't want to use jms you can implement your own (in
>>>>>>>>>>>>>> memory) persistence and correlation. You can also use a resequencer [4] if
>>>>>>>>>>>>>> you want to enforce the order. If you use asyncCallback, you get the replies
>>>>>>>>>>>>>> when they become available, and you can control that.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>>>>>>>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>>>>>>>>>>> message comes in after another, but it can be processed faster, so be
>>>>>>>>>>>>> it.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> It's an interesting scenario, I'll definitely give it more thought, but I
>>>>>>>>>>>>>> hope this helps.
>>>>>>>>>>>>>> Hadrian
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Willem
>>>>>>>>>>>> ----------------------------------
>>>>>>>>>>>> FuseSource
>>>>>>>>>>>> Web: http://www.fusesource.com
>>>>>>>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>>>>>>>>>         http://jnn.javaeye.com (Chinese)
>>>>>>>>>>>> Twitter: willemjiang
>>>>>>>>>>>> Weibo: willemjiang
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>>
>>
>

Re: Socket-based Asynchronous Calls...

Posted by James Carman <ja...@carmanconsulting.com>.
That's what I've been staring at! :)  Here's what I'm thinking I'm
going to need to write.  I need an async processor that remembers the
AsyncCallback and associates it with a correlation id.  Then, when
another exchange comes in that has the same correlation id, it will
lookup the previous callback and say that it's done.  I have a lot of
questions, though.  I've never had to get so "down and dirty" with
Camel before.  The components have just worked for me "off the shelf."

1.  Do I just copy the input message of the Exchange that comes in
second to the output message of the originating exchange?
2.  How do I do a timeout for the original caller (the CXF request)?
3.  How do I detect that the caller has timed out if they do?

I'm sure I'll have more questions, but these are the ones off the top
of my head.

On Wed, Aug 17, 2011 at 1:48 AM, Taariq Levack <ta...@gmail.com> wrote:
> James I think the rest of your puzzle is solved by Camel's async API,
> you might have to check if your task is done, maybe your
> requestResponse populates some collection of responses and provides
> some API to return the response given a correlationID.
> Stare at the async docs [1] a few more times and I'm sure you'll find
> your answer.
>
> [1] http://camel.apache.org/async.html
>
> Taariq
>
> On Tue, Aug 16, 2011 at 11:16 PM, James Carman
> <ja...@carmanconsulting.com> wrote:
>> No worries!  Thank you for your help.  It helped me understand a bit
>> more about how these aggregators work..  However, I still don't
>> understand how to take care of my problem.  I guess I'm going to have
>> to roll my own processor or something.
>>
>> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <ta...@gmail.com> wrote:
>>> Hmmm.
>>> Maybe others can help with that if it's possible, I haven't had to wrestle with it.
>>>
>>> In my case it is actually a cxf service too, but it's asynchronous  and I send the response once I have it, indicating either timeout or the actual response.
>>>
>>> Sorry I responded to your question without going back to see your other posts.
>>>
>>> Taariq
>>>
>>> On 16 Aug 2011, at 10:33 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>
>>>> In my case, the originating request comes from CXF.  How do I send the
>>>> aggregated response back to CXF?
>>>>
>>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>> The consumer that handles the aggregated/timed-out request or response.
>>>>>
>>>>> I have to resend a few times if it's the request, I simply feed it back into "direct:socketRequestRoute" with the header for the number of retry attempts incremented.
>>>>> If it's the response I can forward to some process.
>>>>>
>>>>> Taariq
>>>>>
>>>>> On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>>>
>>>>>> What's listening on the:
>>>>>>
>>>>>> to("direct:requestResponse")
>>>>>>
>>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>>>> Sure
>>>>>>>
>>>>>>> You can of course solve what I've described many ways, but I'll
>>>>>>> explain using 3 routes as that's what I used.
>>>>>>>
>>>>>>> This first route is the main route I mentioned earlier, so you send
>>>>>>> your socket messages here and it's multicast to both the aggregator
>>>>>>> and to the socket.
>>>>>>>
>>>>>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>>>>>>  "someOutboundSocketEndpoint");
>>>>>>>
>>>>>>>
>>>>>>> This next route will aggregate, both requests and responses are sent
>>>>>>> here as you envisaged.
>>>>>>> from("direct:requestResponseAggregator").
>>>>>>>                .aggregate(header("someCorrellationId"),
>>>>>>> requestResponseAggregator)
>>>>>>>                .completionSize(2)
>>>>>>>                .completionTimeout(5000)
>>>>>>>                .to("direct:requestResponse"); //Here you can send the
>>>>>>> "aggregated" message, in my case it's only the response I forward
>>>>>>> unless there's a timeout, then I forward the request of course.
>>>>>>>
>>>>>>> Finally the route that consumes the socket responses.
>>>>>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>>>>>>   //this headerEnricher doesn't have to be a processor, you have many
>>>>>>> options to add a header.
>>>>>>>
>>>>>>> If that's not clear feel free to ask.
>>>>>>>
>>>>>>> Taariq
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>>>>>> <ja...@carmanconsulting.com> wrote:
>>>>>>>> Care to share an example?  I'm not picturing it.
>>>>>>>>
>>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>>>>>> Hi James
>>>>>>>>>
>>>>>>>>> I did that too for what it's worth.
>>>>>>>>> I send the message to a route that forwards to both the aggregator and to the socket.
>>>>>>>>> When the response comes in I use an enricher to add the ID to the headers and then forward to the aggregator.
>>>>>>>>>
>>>>>>>>> Taariq
>>>>>>>>>
>>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>>>>>>>
>>>>>>>>>> Willem,
>>>>>>>>>>
>>>>>>>>>> Thank you for your help.  I don't think this is doing exactly what I
>>>>>>>>>> need, though.  The real trick here is the asynchronous nature of the
>>>>>>>>>> "server" on the other end of this situation.  I thought about using an
>>>>>>>>>> aggregator to make sure the response gets matched up with the request
>>>>>>>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>>>>>>>> responses together into one, it would just make sure it matches the
>>>>>>>>>> correct response with its request.  Does this sound like a valid
>>>>>>>>>> approach?  If so, how the heck do I go about it? :)
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> James
>>>>>>>>>>
>>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <wi...@gmail.com> wrote:
>>>>>>>>>>> Hi James,
>>>>>>>>>>>
>>>>>>>>>>> Camel async process engine already provides the way that you want.
>>>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>>>>>>>
>>>>>>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>>>>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>>>>>>>>
>>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hz...@gmail.com>
>>>>>>>>>>>>  wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi James,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I hope I understand your scenario correctly. Here are a few thoughts. I
>>>>>>>>>>>>> assume want to use camel-netty [1] to send messages to your sever (if you
>>>>>>>>>>>>> have your own code that does that, you can use it too, but you'd have to
>>>>>>>>>>>>> write your own Processor or Component). Iiuic, your scenario is converting a
>>>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your exchange as
>>>>>>>>>>>>> an async in-out and let your framework (Camel) decompose it and compose it
>>>>>>>>>>>>> back again. I would not keep threads blocked so I believe your best bet is
>>>>>>>>>>>>> using the Camel async messaging [2] and Futures (look at the examples using
>>>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so
>>>>>>>>>>>>> you'll need a correlationId, which you must have already and something to
>>>>>>>>>>>>> keep your state. A good bet would be jms [3], or you could write your own.
>>>>>>>>>>>>> If you used jms you would need to use both a correlationId and a replyTo
>>>>>>>>>>>>> queue.
>>>>>>>>>>>>>
>>>>>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>>>>>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>>>>>>>> course).  So, I would need to either block the request thread, waiting
>>>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>>>>>>>> processing stuff (I'm thinking this might help us get more done with
>>>>>>>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>>>>>>>
>>>>>>>>>>>> We already have a correlation id.  The "protocol" requires one and the
>>>>>>>>>>>> server process just echos it back in the response message.
>>>>>>>>>>>>
>>>>>>>>>>>>> You may have to play a bit with the correlationId and if you cannot use
>>>>>>>>>>>>> the same you can do a second transformation/correlation using a claim-check
>>>>>>>>>>>>> sort of pattern. If you don't want to use jms you can implement your own (in
>>>>>>>>>>>>> memory) persistence and correlation. You can also use a resequencer [4] if
>>>>>>>>>>>>> you want to enforce the order. If you use asyncCallback, you get the replies
>>>>>>>>>>>>> when they become available, and you can control that.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>>>>>>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>>>>>>>>>> message comes in after another, but it can be processed faster, so be
>>>>>>>>>>>> it.
>>>>>>>>>>>>
>>>>>>>>>>>>> It's an interesting scenario, I'll definitely give it more thought, but I
>>>>>>>>>>>>> hope this helps.
>>>>>>>>>>>>> Hadrian
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Willem
>>>>>>>>>>> ----------------------------------
>>>>>>>>>>> FuseSource
>>>>>>>>>>> Web: http://www.fusesource.com
>>>>>>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>>>>>>>>         http://jnn.javaeye.com (Chinese)
>>>>>>>>>>> Twitter: willemjiang
>>>>>>>>>>> Weibo: willemjiang
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>
>

Re: Socket-based Asynchronous Calls...

Posted by Taariq Levack <ta...@gmail.com>.
James I think the rest of your puzzle is solved by Camel's async API,
you might have to check if your task is done, maybe your
requestResponse populates some collection of responses and provides
some API to return the response given a correlationID.
Stare at the async docs [1] a few more times and I'm sure you'll find
your answer.

[1] http://camel.apache.org/async.html

Taariq

On Tue, Aug 16, 2011 at 11:16 PM, James Carman
<ja...@carmanconsulting.com> wrote:
> No worries!  Thank you for your help.  It helped me understand a bit
> more about how these aggregators work..  However, I still don't
> understand how to take care of my problem.  I guess I'm going to have
> to roll my own processor or something.
>
> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <ta...@gmail.com> wrote:
>> Hmmm.
>> Maybe others can help with that if it's possible, I haven't had to wrestle with it.
>>
>> In my case it is actually a cxf service too, but it's asynchronous  and I send the response once I have it, indicating either timeout or the actual response.
>>
>> Sorry I responded to your question without going back to see your other posts.
>>
>> Taariq
>>
>> On 16 Aug 2011, at 10:33 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>
>>> In my case, the originating request comes from CXF.  How do I send the
>>> aggregated response back to CXF?
>>>
>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>> The consumer that handles the aggregated/timed-out request or response.
>>>>
>>>> I have to resend a few times if it's the request, I simply feed it back into "direct:socketRequestRoute" with the header for the number of retry attempts incremented.
>>>> If it's the response I can forward to some process.
>>>>
>>>> Taariq
>>>>
>>>> On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>>
>>>>> What's listening on the:
>>>>>
>>>>> to("direct:requestResponse")
>>>>>
>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>>> Sure
>>>>>>
>>>>>> You can of course solve what I've described many ways, but I'll
>>>>>> explain using 3 routes as that's what I used.
>>>>>>
>>>>>> This first route is the main route I mentioned earlier, so you send
>>>>>> your socket messages here and it's multicast to both the aggregator
>>>>>> and to the socket.
>>>>>>
>>>>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>>>>>  "someOutboundSocketEndpoint");
>>>>>>
>>>>>>
>>>>>> This next route will aggregate, both requests and responses are sent
>>>>>> here as you envisaged.
>>>>>> from("direct:requestResponseAggregator").
>>>>>>                .aggregate(header("someCorrellationId"),
>>>>>> requestResponseAggregator)
>>>>>>                .completionSize(2)
>>>>>>                .completionTimeout(5000)
>>>>>>                .to("direct:requestResponse"); //Here you can send the
>>>>>> "aggregated" message, in my case it's only the response I forward
>>>>>> unless there's a timeout, then I forward the request of course.
>>>>>>
>>>>>> Finally the route that consumes the socket responses.
>>>>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>>>>>   //this headerEnricher doesn't have to be a processor, you have many
>>>>>> options to add a header.
>>>>>>
>>>>>> If that's not clear feel free to ask.
>>>>>>
>>>>>> Taariq
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>>>>> <ja...@carmanconsulting.com> wrote:
>>>>>>> Care to share an example?  I'm not picturing it.
>>>>>>>
>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>>>>> Hi James
>>>>>>>>
>>>>>>>> I did that too for what it's worth.
>>>>>>>> I send the message to a route that forwards to both the aggregator and to the socket.
>>>>>>>> When the response comes in I use an enricher to add the ID to the headers and then forward to the aggregator.
>>>>>>>>
>>>>>>>> Taariq
>>>>>>>>
>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>>>>>>
>>>>>>>>> Willem,
>>>>>>>>>
>>>>>>>>> Thank you for your help.  I don't think this is doing exactly what I
>>>>>>>>> need, though.  The real trick here is the asynchronous nature of the
>>>>>>>>> "server" on the other end of this situation.  I thought about using an
>>>>>>>>> aggregator to make sure the response gets matched up with the request
>>>>>>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>>>>>>> responses together into one, it would just make sure it matches the
>>>>>>>>> correct response with its request.  Does this sound like a valid
>>>>>>>>> approach?  If so, how the heck do I go about it? :)
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> James
>>>>>>>>>
>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <wi...@gmail.com> wrote:
>>>>>>>>>> Hi James,
>>>>>>>>>>
>>>>>>>>>> Camel async process engine already provides the way that you want.
>>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>>>>>>
>>>>>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>>>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>>>>>>>
>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hz...@gmail.com>
>>>>>>>>>>>  wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi James,
>>>>>>>>>>>>
>>>>>>>>>>>> I hope I understand your scenario correctly. Here are a few thoughts. I
>>>>>>>>>>>> assume want to use camel-netty [1] to send messages to your sever (if you
>>>>>>>>>>>> have your own code that does that, you can use it too, but you'd have to
>>>>>>>>>>>> write your own Processor or Component). Iiuic, your scenario is converting a
>>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your exchange as
>>>>>>>>>>>> an async in-out and let your framework (Camel) decompose it and compose it
>>>>>>>>>>>> back again. I would not keep threads blocked so I believe your best bet is
>>>>>>>>>>>> using the Camel async messaging [2] and Futures (look at the examples using
>>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so
>>>>>>>>>>>> you'll need a correlationId, which you must have already and something to
>>>>>>>>>>>> keep your state. A good bet would be jms [3], or you could write your own.
>>>>>>>>>>>> If you used jms you would need to use both a correlationId and a replyTo
>>>>>>>>>>>> queue.
>>>>>>>>>>>>
>>>>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>>>>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>>>>>>> course).  So, I would need to either block the request thread, waiting
>>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>>>>>>> processing stuff (I'm thinking this might help us get more done with
>>>>>>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>>>>>>
>>>>>>>>>>> We already have a correlation id.  The "protocol" requires one and the
>>>>>>>>>>> server process just echos it back in the response message.
>>>>>>>>>>>
>>>>>>>>>>>> You may have to play a bit with the correlationId and if you cannot use
>>>>>>>>>>>> the same you can do a second transformation/correlation using a claim-check
>>>>>>>>>>>> sort of pattern. If you don't want to use jms you can implement your own (in
>>>>>>>>>>>> memory) persistence and correlation. You can also use a resequencer [4] if
>>>>>>>>>>>> you want to enforce the order. If you use asyncCallback, you get the replies
>>>>>>>>>>>> when they become available, and you can control that.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>>>>>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>>>>>>>>> message comes in after another, but it can be processed faster, so be
>>>>>>>>>>> it.
>>>>>>>>>>>
>>>>>>>>>>>> It's an interesting scenario, I'll definitely give it more thought, but I
>>>>>>>>>>>> hope this helps.
>>>>>>>>>>>> Hadrian
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Willem
>>>>>>>>>> ----------------------------------
>>>>>>>>>> FuseSource
>>>>>>>>>> Web: http://www.fusesource.com
>>>>>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>>>>>>>         http://jnn.javaeye.com (Chinese)
>>>>>>>>>> Twitter: willemjiang
>>>>>>>>>> Weibo: willemjiang
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>
>

Re: Socket-based Asynchronous Calls...

Posted by James Carman <ja...@carmanconsulting.com>.
No worries!  Thank you for your help.  It helped me understand a bit
more about how these aggregators work..  However, I still don't
understand how to take care of my problem.  I guess I'm going to have
to roll my own processor or something.

On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <ta...@gmail.com> wrote:
> Hmmm.
> Maybe others can help with that if it's possible, I haven't had to wrestle with it.
>
> In my case it is actually a cxf service too, but it's asynchronous  and I send the response once I have it, indicating either timeout or the actual response.
>
> Sorry I responded to your question without going back to see your other posts.
>
> Taariq
>
> On 16 Aug 2011, at 10:33 PM, James Carman <ja...@carmanconsulting.com> wrote:
>
>> In my case, the originating request comes from CXF.  How do I send the
>> aggregated response back to CXF?
>>
>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <ta...@gmail.com> wrote:
>>> The consumer that handles the aggregated/timed-out request or response.
>>>
>>> I have to resend a few times if it's the request, I simply feed it back into "direct:socketRequestRoute" with the header for the number of retry attempts incremented.
>>> If it's the response I can forward to some process.
>>>
>>> Taariq
>>>
>>> On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>
>>>> What's listening on the:
>>>>
>>>> to("direct:requestResponse")
>>>>
>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>> Sure
>>>>>
>>>>> You can of course solve what I've described many ways, but I'll
>>>>> explain using 3 routes as that's what I used.
>>>>>
>>>>> This first route is the main route I mentioned earlier, so you send
>>>>> your socket messages here and it's multicast to both the aggregator
>>>>> and to the socket.
>>>>>
>>>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>>>>  "someOutboundSocketEndpoint");
>>>>>
>>>>>
>>>>> This next route will aggregate, both requests and responses are sent
>>>>> here as you envisaged.
>>>>> from("direct:requestResponseAggregator").
>>>>>                .aggregate(header("someCorrellationId"),
>>>>> requestResponseAggregator)
>>>>>                .completionSize(2)
>>>>>                .completionTimeout(5000)
>>>>>                .to("direct:requestResponse"); //Here you can send the
>>>>> "aggregated" message, in my case it's only the response I forward
>>>>> unless there's a timeout, then I forward the request of course.
>>>>>
>>>>> Finally the route that consumes the socket responses.
>>>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>>>>   //this headerEnricher doesn't have to be a processor, you have many
>>>>> options to add a header.
>>>>>
>>>>> If that's not clear feel free to ask.
>>>>>
>>>>> Taariq
>>>>>
>>>>>
>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>>>> <ja...@carmanconsulting.com> wrote:
>>>>>> Care to share an example?  I'm not picturing it.
>>>>>>
>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>>>> Hi James
>>>>>>>
>>>>>>> I did that too for what it's worth.
>>>>>>> I send the message to a route that forwards to both the aggregator and to the socket.
>>>>>>> When the response comes in I use an enricher to add the ID to the headers and then forward to the aggregator.
>>>>>>>
>>>>>>> Taariq
>>>>>>>
>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>>>>>
>>>>>>>> Willem,
>>>>>>>>
>>>>>>>> Thank you for your help.  I don't think this is doing exactly what I
>>>>>>>> need, though.  The real trick here is the asynchronous nature of the
>>>>>>>> "server" on the other end of this situation.  I thought about using an
>>>>>>>> aggregator to make sure the response gets matched up with the request
>>>>>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>>>>>> responses together into one, it would just make sure it matches the
>>>>>>>> correct response with its request.  Does this sound like a valid
>>>>>>>> approach?  If so, how the heck do I go about it? :)
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> James
>>>>>>>>
>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <wi...@gmail.com> wrote:
>>>>>>>>> Hi James,
>>>>>>>>>
>>>>>>>>> Camel async process engine already provides the way that you want.
>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>>>>>
>>>>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>>>>>>
>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>>>>>
>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hz...@gmail.com>
>>>>>>>>>>  wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi James,
>>>>>>>>>>>
>>>>>>>>>>> I hope I understand your scenario correctly. Here are a few thoughts. I
>>>>>>>>>>> assume want to use camel-netty [1] to send messages to your sever (if you
>>>>>>>>>>> have your own code that does that, you can use it too, but you'd have to
>>>>>>>>>>> write your own Processor or Component). Iiuic, your scenario is converting a
>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your exchange as
>>>>>>>>>>> an async in-out and let your framework (Camel) decompose it and compose it
>>>>>>>>>>> back again. I would not keep threads blocked so I believe your best bet is
>>>>>>>>>>> using the Camel async messaging [2] and Futures (look at the examples using
>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so
>>>>>>>>>>> you'll need a correlationId, which you must have already and something to
>>>>>>>>>>> keep your state. A good bet would be jms [3], or you could write your own.
>>>>>>>>>>> If you used jms you would need to use both a correlationId and a replyTo
>>>>>>>>>>> queue.
>>>>>>>>>>>
>>>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>>>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>>>>>> course).  So, I would need to either block the request thread, waiting
>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>>>>>> processing stuff (I'm thinking this might help us get more done with
>>>>>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>>>>>
>>>>>>>>>> We already have a correlation id.  The "protocol" requires one and the
>>>>>>>>>> server process just echos it back in the response message.
>>>>>>>>>>
>>>>>>>>>>> You may have to play a bit with the correlationId and if you cannot use
>>>>>>>>>>> the same you can do a second transformation/correlation using a claim-check
>>>>>>>>>>> sort of pattern. If you don't want to use jms you can implement your own (in
>>>>>>>>>>> memory) persistence and correlation. You can also use a resequencer [4] if
>>>>>>>>>>> you want to enforce the order. If you use asyncCallback, you get the replies
>>>>>>>>>>> when they become available, and you can control that.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>>>>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>>>>>>>> message comes in after another, but it can be processed faster, so be
>>>>>>>>>> it.
>>>>>>>>>>
>>>>>>>>>>> It's an interesting scenario, I'll definitely give it more thought, but I
>>>>>>>>>>> hope this helps.
>>>>>>>>>>> Hadrian
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Willem
>>>>>>>>> ----------------------------------
>>>>>>>>> FuseSource
>>>>>>>>> Web: http://www.fusesource.com
>>>>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>>>>>>         http://jnn.javaeye.com (Chinese)
>>>>>>>>> Twitter: willemjiang
>>>>>>>>> Weibo: willemjiang
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>

Re: Socket-based Asynchronous Calls...

Posted by Taariq Levack <ta...@gmail.com>.
Hmmm.
Maybe others can help with that if it's possible, I haven't had to wrestle with it.

In my case it is actually a cxf service too, but it's asynchronous  and I send the response once I have it, indicating either timeout or the actual response.

Sorry I responded to your question without going back to see your other posts.

Taariq

On 16 Aug 2011, at 10:33 PM, James Carman <ja...@carmanconsulting.com> wrote:

> In my case, the originating request comes from CXF.  How do I send the
> aggregated response back to CXF?
> 
> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <ta...@gmail.com> wrote:
>> The consumer that handles the aggregated/timed-out request or response.
>> 
>> I have to resend a few times if it's the request, I simply feed it back into "direct:socketRequestRoute" with the header for the number of retry attempts incremented.
>> If it's the response I can forward to some process.
>> 
>> Taariq
>> 
>> On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> wrote:
>> 
>>> What's listening on the:
>>> 
>>> to("direct:requestResponse")
>>> 
>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>> Sure
>>>> 
>>>> You can of course solve what I've described many ways, but I'll
>>>> explain using 3 routes as that's what I used.
>>>> 
>>>> This first route is the main route I mentioned earlier, so you send
>>>> your socket messages here and it's multicast to both the aggregator
>>>> and to the socket.
>>>> 
>>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>>>  "someOutboundSocketEndpoint");
>>>> 
>>>> 
>>>> This next route will aggregate, both requests and responses are sent
>>>> here as you envisaged.
>>>> from("direct:requestResponseAggregator").
>>>>                .aggregate(header("someCorrellationId"),
>>>> requestResponseAggregator)
>>>>                .completionSize(2)
>>>>                .completionTimeout(5000)
>>>>                .to("direct:requestResponse"); //Here you can send the
>>>> "aggregated" message, in my case it's only the response I forward
>>>> unless there's a timeout, then I forward the request of course.
>>>> 
>>>> Finally the route that consumes the socket responses.
>>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>>>   //this headerEnricher doesn't have to be a processor, you have many
>>>> options to add a header.
>>>> 
>>>> If that's not clear feel free to ask.
>>>> 
>>>> Taariq
>>>> 
>>>> 
>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>>> <ja...@carmanconsulting.com> wrote:
>>>>> Care to share an example?  I'm not picturing it.
>>>>> 
>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>>> Hi James
>>>>>> 
>>>>>> I did that too for what it's worth.
>>>>>> I send the message to a route that forwards to both the aggregator and to the socket.
>>>>>> When the response comes in I use an enricher to add the ID to the headers and then forward to the aggregator.
>>>>>> 
>>>>>> Taariq
>>>>>> 
>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>>>> 
>>>>>>> Willem,
>>>>>>> 
>>>>>>> Thank you for your help.  I don't think this is doing exactly what I
>>>>>>> need, though.  The real trick here is the asynchronous nature of the
>>>>>>> "server" on the other end of this situation.  I thought about using an
>>>>>>> aggregator to make sure the response gets matched up with the request
>>>>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>>>>> responses together into one, it would just make sure it matches the
>>>>>>> correct response with its request.  Does this sound like a valid
>>>>>>> approach?  If so, how the heck do I go about it? :)
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> James
>>>>>>> 
>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <wi...@gmail.com> wrote:
>>>>>>>> Hi James,
>>>>>>>> 
>>>>>>>> Camel async process engine already provides the way that you want.
>>>>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>>>> 
>>>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>>>>> 
>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>>>> 
>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hz...@gmail.com>
>>>>>>>>>  wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi James,
>>>>>>>>>> 
>>>>>>>>>> I hope I understand your scenario correctly. Here are a few thoughts. I
>>>>>>>>>> assume want to use camel-netty [1] to send messages to your sever (if you
>>>>>>>>>> have your own code that does that, you can use it too, but you'd have to
>>>>>>>>>> write your own Processor or Component). Iiuic, your scenario is converting a
>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your exchange as
>>>>>>>>>> an async in-out and let your framework (Camel) decompose it and compose it
>>>>>>>>>> back again. I would not keep threads blocked so I believe your best bet is
>>>>>>>>>> using the Camel async messaging [2] and Futures (look at the examples using
>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so
>>>>>>>>>> you'll need a correlationId, which you must have already and something to
>>>>>>>>>> keep your state. A good bet would be jms [3], or you could write your own.
>>>>>>>>>> If you used jms you would need to use both a correlationId and a replyTo
>>>>>>>>>> queue.
>>>>>>>>>> 
>>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>>>>> course).  So, I would need to either block the request thread, waiting
>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>>>>> processing stuff (I'm thinking this might help us get more done with
>>>>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>>>> 
>>>>>>>>> We already have a correlation id.  The "protocol" requires one and the
>>>>>>>>> server process just echos it back in the response message.
>>>>>>>>> 
>>>>>>>>>> You may have to play a bit with the correlationId and if you cannot use
>>>>>>>>>> the same you can do a second transformation/correlation using a claim-check
>>>>>>>>>> sort of pattern. If you don't want to use jms you can implement your own (in
>>>>>>>>>> memory) persistence and correlation. You can also use a resequencer [4] if
>>>>>>>>>> you want to enforce the order. If you use asyncCallback, you get the replies
>>>>>>>>>> when they become available, and you can control that.
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>>>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>>>>>>> message comes in after another, but it can be processed faster, so be
>>>>>>>>> it.
>>>>>>>>> 
>>>>>>>>>> It's an interesting scenario, I'll definitely give it more thought, but I
>>>>>>>>>> hope this helps.
>>>>>>>>>> Hadrian
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> Willem
>>>>>>>> ----------------------------------
>>>>>>>> FuseSource
>>>>>>>> Web: http://www.fusesource.com
>>>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>>>>>         http://jnn.javaeye.com (Chinese)
>>>>>>>> Twitter: willemjiang
>>>>>>>> Weibo: willemjiang
>>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 

Re: Socket-based Asynchronous Calls...

Posted by James Carman <ja...@carmanconsulting.com>.
In my case, the originating request comes from CXF.  How do I send the
aggregated response back to CXF?

On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <ta...@gmail.com> wrote:
> The consumer that handles the aggregated/timed-out request or response.
>
> I have to resend a few times if it's the request, I simply feed it back into "direct:socketRequestRoute" with the header for the number of retry attempts incremented.
> If it's the response I can forward to some process.
>
> Taariq
>
> On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> wrote:
>
>> What's listening on the:
>>
>> to("direct:requestResponse")
>>
>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <ta...@gmail.com> wrote:
>>> Sure
>>>
>>> You can of course solve what I've described many ways, but I'll
>>> explain using 3 routes as that's what I used.
>>>
>>> This first route is the main route I mentioned earlier, so you send
>>> your socket messages here and it's multicast to both the aggregator
>>> and to the socket.
>>>
>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>>  "someOutboundSocketEndpoint");
>>>
>>>
>>> This next route will aggregate, both requests and responses are sent
>>> here as you envisaged.
>>> from("direct:requestResponseAggregator").
>>>                .aggregate(header("someCorrellationId"),
>>> requestResponseAggregator)
>>>                .completionSize(2)
>>>                .completionTimeout(5000)
>>>                .to("direct:requestResponse"); //Here you can send the
>>> "aggregated" message, in my case it's only the response I forward
>>> unless there's a timeout, then I forward the request of course.
>>>
>>> Finally the route that consumes the socket responses.
>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>>   //this headerEnricher doesn't have to be a processor, you have many
>>> options to add a header.
>>>
>>> If that's not clear feel free to ask.
>>>
>>> Taariq
>>>
>>>
>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>> <ja...@carmanconsulting.com> wrote:
>>>> Care to share an example?  I'm not picturing it.
>>>>
>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>>> Hi James
>>>>>
>>>>> I did that too for what it's worth.
>>>>> I send the message to a route that forwards to both the aggregator and to the socket.
>>>>> When the response comes in I use an enricher to add the ID to the headers and then forward to the aggregator.
>>>>>
>>>>> Taariq
>>>>>
>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>>>
>>>>>> Willem,
>>>>>>
>>>>>> Thank you for your help.  I don't think this is doing exactly what I
>>>>>> need, though.  The real trick here is the asynchronous nature of the
>>>>>> "server" on the other end of this situation.  I thought about using an
>>>>>> aggregator to make sure the response gets matched up with the request
>>>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>>>> responses together into one, it would just make sure it matches the
>>>>>> correct response with its request.  Does this sound like a valid
>>>>>> approach?  If so, how the heck do I go about it? :)
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> James
>>>>>>
>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <wi...@gmail.com> wrote:
>>>>>>> Hi James,
>>>>>>>
>>>>>>> Camel async process engine already provides the way that you want.
>>>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>>>
>>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>>>>
>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>>>
>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hz...@gmail.com>
>>>>>>>>  wrote:
>>>>>>>>>
>>>>>>>>> Hi James,
>>>>>>>>>
>>>>>>>>> I hope I understand your scenario correctly. Here are a few thoughts. I
>>>>>>>>> assume want to use camel-netty [1] to send messages to your sever (if you
>>>>>>>>> have your own code that does that, you can use it too, but you'd have to
>>>>>>>>> write your own Processor or Component). Iiuic, your scenario is converting a
>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your exchange as
>>>>>>>>> an async in-out and let your framework (Camel) decompose it and compose it
>>>>>>>>> back again. I would not keep threads blocked so I believe your best bet is
>>>>>>>>> using the Camel async messaging [2] and Futures (look at the examples using
>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so
>>>>>>>>> you'll need a correlationId, which you must have already and something to
>>>>>>>>> keep your state. A good bet would be jms [3], or you could write your own.
>>>>>>>>> If you used jms you would need to use both a correlationId and a replyTo
>>>>>>>>> queue.
>>>>>>>>>
>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>>>
>>>>>>>>
>>>>>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>>>> course).  So, I would need to either block the request thread, waiting
>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>>>> processing stuff (I'm thinking this might help us get more done with
>>>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>>>
>>>>>>>> We already have a correlation id.  The "protocol" requires one and the
>>>>>>>> server process just echos it back in the response message.
>>>>>>>>
>>>>>>>>> You may have to play a bit with the correlationId and if you cannot use
>>>>>>>>> the same you can do a second transformation/correlation using a claim-check
>>>>>>>>> sort of pattern. If you don't want to use jms you can implement your own (in
>>>>>>>>> memory) persistence and correlation. You can also use a resequencer [4] if
>>>>>>>>> you want to enforce the order. If you use asyncCallback, you get the replies
>>>>>>>>> when they become available, and you can control that.
>>>>>>>>>
>>>>>>>>
>>>>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>>>>>> message comes in after another, but it can be processed faster, so be
>>>>>>>> it.
>>>>>>>>
>>>>>>>>> It's an interesting scenario, I'll definitely give it more thought, but I
>>>>>>>>> hope this helps.
>>>>>>>>> Hadrian
>>>>>>>>>
>>>>>>>>
>>>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Willem
>>>>>>> ----------------------------------
>>>>>>> FuseSource
>>>>>>> Web: http://www.fusesource.com
>>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>>>>         http://jnn.javaeye.com (Chinese)
>>>>>>> Twitter: willemjiang
>>>>>>> Weibo: willemjiang
>>>>>>>
>>>>>
>>>>
>>>
>

Re: Socket-based Asynchronous Calls...

Posted by Taariq Levack <ta...@gmail.com>.
The consumer that handles the aggregated/timed-out request or response.

I have to resend a few times if it's the request, I simply feed it back into "direct:socketRequestRoute" with the header for the number of retry attempts incremented.
If it's the response I can forward to some process.

Taariq

On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> wrote:

> What's listening on the:
> 
> to("direct:requestResponse")
> 
> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <ta...@gmail.com> wrote:
>> Sure
>> 
>> You can of course solve what I've described many ways, but I'll
>> explain using 3 routes as that's what I used.
>> 
>> This first route is the main route I mentioned earlier, so you send
>> your socket messages here and it's multicast to both the aggregator
>> and to the socket.
>> 
>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>  "someOutboundSocketEndpoint");
>> 
>> 
>> This next route will aggregate, both requests and responses are sent
>> here as you envisaged.
>> from("direct:requestResponseAggregator").
>>                .aggregate(header("someCorrellationId"),
>> requestResponseAggregator)
>>                .completionSize(2)
>>                .completionTimeout(5000)
>>                .to("direct:requestResponse"); //Here you can send the
>> "aggregated" message, in my case it's only the response I forward
>> unless there's a timeout, then I forward the request of course.
>> 
>> Finally the route that consumes the socket responses.
>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>   //this headerEnricher doesn't have to be a processor, you have many
>> options to add a header.
>> 
>> If that's not clear feel free to ask.
>> 
>> Taariq
>> 
>> 
>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>> <ja...@carmanconsulting.com> wrote:
>>> Care to share an example?  I'm not picturing it.
>>> 
>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <ta...@gmail.com> wrote:
>>>> Hi James
>>>> 
>>>> I did that too for what it's worth.
>>>> I send the message to a route that forwards to both the aggregator and to the socket.
>>>> When the response comes in I use an enricher to add the ID to the headers and then forward to the aggregator.
>>>> 
>>>> Taariq
>>>> 
>>>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>> 
>>>>> Willem,
>>>>> 
>>>>> Thank you for your help.  I don't think this is doing exactly what I
>>>>> need, though.  The real trick here is the asynchronous nature of the
>>>>> "server" on the other end of this situation.  I thought about using an
>>>>> aggregator to make sure the response gets matched up with the request
>>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>>> responses together into one, it would just make sure it matches the
>>>>> correct response with its request.  Does this sound like a valid
>>>>> approach?  If so, how the heck do I go about it? :)
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> James
>>>>> 
>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <wi...@gmail.com> wrote:
>>>>>> Hi James,
>>>>>> 
>>>>>> Camel async process engine already provides the way that you want.
>>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>> 
>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>>> 
>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>> 
>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hz...@gmail.com>
>>>>>>>  wrote:
>>>>>>>> 
>>>>>>>> Hi James,
>>>>>>>> 
>>>>>>>> I hope I understand your scenario correctly. Here are a few thoughts. I
>>>>>>>> assume want to use camel-netty [1] to send messages to your sever (if you
>>>>>>>> have your own code that does that, you can use it too, but you'd have to
>>>>>>>> write your own Processor or Component). Iiuic, your scenario is converting a
>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your exchange as
>>>>>>>> an async in-out and let your framework (Camel) decompose it and compose it
>>>>>>>> back again. I would not keep threads blocked so I believe your best bet is
>>>>>>>> using the Camel async messaging [2] and Futures (look at the examples using
>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so
>>>>>>>> you'll need a correlationId, which you must have already and something to
>>>>>>>> keep your state. A good bet would be jms [3], or you could write your own.
>>>>>>>> If you used jms you would need to use both a correlationId and a replyTo
>>>>>>>> queue.
>>>>>>>> 
>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>> 
>>>>>>> 
>>>>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>>> course).  So, I would need to either block the request thread, waiting
>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>>> processing stuff (I'm thinking this might help us get more done with
>>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>> 
>>>>>>> We already have a correlation id.  The "protocol" requires one and the
>>>>>>> server process just echos it back in the response message.
>>>>>>> 
>>>>>>>> You may have to play a bit with the correlationId and if you cannot use
>>>>>>>> the same you can do a second transformation/correlation using a claim-check
>>>>>>>> sort of pattern. If you don't want to use jms you can implement your own (in
>>>>>>>> memory) persistence and correlation. You can also use a resequencer [4] if
>>>>>>>> you want to enforce the order. If you use asyncCallback, you get the replies
>>>>>>>> when they become available, and you can control that.
>>>>>>>> 
>>>>>>> 
>>>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>>>>> message comes in after another, but it can be processed faster, so be
>>>>>>> it.
>>>>>>> 
>>>>>>>> It's an interesting scenario, I'll definitely give it more thought, but I
>>>>>>>> hope this helps.
>>>>>>>> Hadrian
>>>>>>>> 
>>>>>>> 
>>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Willem
>>>>>> ----------------------------------
>>>>>> FuseSource
>>>>>> Web: http://www.fusesource.com
>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>>>         http://jnn.javaeye.com (Chinese)
>>>>>> Twitter: willemjiang
>>>>>> Weibo: willemjiang
>>>>>> 
>>>> 
>>> 
>> 

Re: Socket-based Asynchronous Calls...

Posted by James Carman <ja...@carmanconsulting.com>.
What's listening on the:

to("direct:requestResponse")

On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <ta...@gmail.com> wrote:
> Sure
>
> You can of course solve what I've described many ways, but I'll
> explain using 3 routes as that's what I used.
>
> This first route is the main route I mentioned earlier, so you send
> your socket messages here and it's multicast to both the aggregator
> and to the socket.
>
> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>  "someOutboundSocketEndpoint");
>
>
> This next route will aggregate, both requests and responses are sent
> here as you envisaged.
> from("direct:requestResponseAggregator").
>                .aggregate(header("someCorrellationId"),
> requestResponseAggregator)
>                .completionSize(2)
>                .completionTimeout(5000)
>                .to("direct:requestResponse"); //Here you can send the
> "aggregated" message, in my case it's only the response I forward
> unless there's a timeout, then I forward the request of course.
>
> Finally the route that consumes the socket responses.
> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>   //this headerEnricher doesn't have to be a processor, you have many
> options to add a header.
>
> If that's not clear feel free to ask.
>
> Taariq
>
>
> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
> <ja...@carmanconsulting.com> wrote:
>> Care to share an example?  I'm not picturing it.
>>
>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <ta...@gmail.com> wrote:
>>> Hi James
>>>
>>> I did that too for what it's worth.
>>> I send the message to a route that forwards to both the aggregator and to the socket.
>>> When the response comes in I use an enricher to add the ID to the headers and then forward to the aggregator.
>>>
>>> Taariq
>>>
>>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>>
>>>> Willem,
>>>>
>>>> Thank you for your help.  I don't think this is doing exactly what I
>>>> need, though.  The real trick here is the asynchronous nature of the
>>>> "server" on the other end of this situation.  I thought about using an
>>>> aggregator to make sure the response gets matched up with the request
>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>> responses together into one, it would just make sure it matches the
>>>> correct response with its request.  Does this sound like a valid
>>>> approach?  If so, how the heck do I go about it? :)
>>>>
>>>> Thanks,
>>>>
>>>> James
>>>>
>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <wi...@gmail.com> wrote:
>>>>> Hi James,
>>>>>
>>>>> Camel async process engine already provides the way that you want.
>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>
>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>>
>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>
>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hz...@gmail.com>
>>>>>>  wrote:
>>>>>>>
>>>>>>> Hi James,
>>>>>>>
>>>>>>> I hope I understand your scenario correctly. Here are a few thoughts. I
>>>>>>> assume want to use camel-netty [1] to send messages to your sever (if you
>>>>>>> have your own code that does that, you can use it too, but you'd have to
>>>>>>> write your own Processor or Component). Iiuic, your scenario is converting a
>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your exchange as
>>>>>>> an async in-out and let your framework (Camel) decompose it and compose it
>>>>>>> back again. I would not keep threads blocked so I believe your best bet is
>>>>>>> using the Camel async messaging [2] and Futures (look at the examples using
>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so
>>>>>>> you'll need a correlationId, which you must have already and something to
>>>>>>> keep your state. A good bet would be jms [3], or you could write your own.
>>>>>>> If you used jms you would need to use both a correlationId and a replyTo
>>>>>>> queue.
>>>>>>>
>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>
>>>>>>
>>>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>> course).  So, I would need to either block the request thread, waiting
>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>> processing stuff (I'm thinking this might help us get more done with
>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>
>>>>>> We already have a correlation id.  The "protocol" requires one and the
>>>>>> server process just echos it back in the response message.
>>>>>>
>>>>>>> You may have to play a bit with the correlationId and if you cannot use
>>>>>>> the same you can do a second transformation/correlation using a claim-check
>>>>>>> sort of pattern. If you don't want to use jms you can implement your own (in
>>>>>>> memory) persistence and correlation. You can also use a resequencer [4] if
>>>>>>> you want to enforce the order. If you use asyncCallback, you get the replies
>>>>>>> when they become available, and you can control that.
>>>>>>>
>>>>>>
>>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>>>> message comes in after another, but it can be processed faster, so be
>>>>>> it.
>>>>>>
>>>>>>> It's an interesting scenario, I'll definitely give it more thought, but I
>>>>>>> hope this helps.
>>>>>>> Hadrian
>>>>>>>
>>>>>>
>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Willem
>>>>> ----------------------------------
>>>>> FuseSource
>>>>> Web: http://www.fusesource.com
>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>>         http://jnn.javaeye.com (Chinese)
>>>>> Twitter: willemjiang
>>>>> Weibo: willemjiang
>>>>>
>>>
>>
>

Re: Socket-based Asynchronous Calls...

Posted by Taariq Levack <ta...@gmail.com>.
Sure

You can of course solve what I've described many ways, but I'll
explain using 3 routes as that's what I used.

This first route is the main route I mentioned earlier, so you send
your socket messages here and it's multicast to both the aggregator
and to the socket.

from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
 "someOutboundSocketEndpoint");


This next route will aggregate, both requests and responses are sent
here as you envisaged.
from("direct:requestResponseAggregator").
                .aggregate(header("someCorrellationId"),
requestResponseAggregator)
                .completionSize(2)
                .completionTimeout(5000)
                .to("direct:requestResponse"); //Here you can send the
"aggregated" message, in my case it's only the response I forward
unless there's a timeout, then I forward the request of course.

Finally the route that consumes the socket responses.
from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
   //this headerEnricher doesn't have to be a processor, you have many
options to add a header.

If that's not clear feel free to ask.

Taariq


On Tue, Aug 16, 2011 at 9:30 PM, James Carman
<ja...@carmanconsulting.com> wrote:
> Care to share an example?  I'm not picturing it.
>
> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <ta...@gmail.com> wrote:
>> Hi James
>>
>> I did that too for what it's worth.
>> I send the message to a route that forwards to both the aggregator and to the socket.
>> When the response comes in I use an enricher to add the ID to the headers and then forward to the aggregator.
>>
>> Taariq
>>
>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>
>>> Willem,
>>>
>>> Thank you for your help.  I don't think this is doing exactly what I
>>> need, though.  The real trick here is the asynchronous nature of the
>>> "server" on the other end of this situation.  I thought about using an
>>> aggregator to make sure the response gets matched up with the request
>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>> responses together into one, it would just make sure it matches the
>>> correct response with its request.  Does this sound like a valid
>>> approach?  If so, how the heck do I go about it? :)
>>>
>>> Thanks,
>>>
>>> James
>>>
>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <wi...@gmail.com> wrote:
>>>> Hi James,
>>>>
>>>> Camel async process engine already provides the way that you want.
>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>
>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>
>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>
>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hz...@gmail.com>
>>>>>  wrote:
>>>>>>
>>>>>> Hi James,
>>>>>>
>>>>>> I hope I understand your scenario correctly. Here are a few thoughts. I
>>>>>> assume want to use camel-netty [1] to send messages to your sever (if you
>>>>>> have your own code that does that, you can use it too, but you'd have to
>>>>>> write your own Processor or Component). Iiuic, your scenario is converting a
>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your exchange as
>>>>>> an async in-out and let your framework (Camel) decompose it and compose it
>>>>>> back again. I would not keep threads blocked so I believe your best bet is
>>>>>> using the Camel async messaging [2] and Futures (look at the examples using
>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so
>>>>>> you'll need a correlationId, which you must have already and something to
>>>>>> keep your state. A good bet would be jms [3], or you could write your own.
>>>>>> If you used jms you would need to use both a correlationId and a replyTo
>>>>>> queue.
>>>>>>
>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>
>>>>>
>>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>> course).  So, I would need to either block the request thread, waiting
>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>> processing stuff (I'm thinking this might help us get more done with
>>>>> less http request threads) to do more of a continuation thing.
>>>>>
>>>>> We already have a correlation id.  The "protocol" requires one and the
>>>>> server process just echos it back in the response message.
>>>>>
>>>>>> You may have to play a bit with the correlationId and if you cannot use
>>>>>> the same you can do a second transformation/correlation using a claim-check
>>>>>> sort of pattern. If you don't want to use jms you can implement your own (in
>>>>>> memory) persistence and correlation. You can also use a resequencer [4] if
>>>>>> you want to enforce the order. If you use asyncCallback, you get the replies
>>>>>> when they become available, and you can control that.
>>>>>>
>>>>>
>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>>> message comes in after another, but it can be processed faster, so be
>>>>> it.
>>>>>
>>>>>> It's an interesting scenario, I'll definitely give it more thought, but I
>>>>>> hope this helps.
>>>>>> Hadrian
>>>>>>
>>>>>
>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>
>>>>
>>>>
>>>> --
>>>> Willem
>>>> ----------------------------------
>>>> FuseSource
>>>> Web: http://www.fusesource.com
>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>         http://jnn.javaeye.com (Chinese)
>>>> Twitter: willemjiang
>>>> Weibo: willemjiang
>>>>
>>
>

Re: Socket-based Asynchronous Calls...

Posted by James Carman <ja...@carmanconsulting.com>.
Care to share an example?  I'm not picturing it.

On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <ta...@gmail.com> wrote:
> Hi James
>
> I did that too for what it's worth.
> I send the message to a route that forwards to both the aggregator and to the socket.
> When the response comes in I use an enricher to add the ID to the headers and then forward to the aggregator.
>
> Taariq
>
> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> wrote:
>
>> Willem,
>>
>> Thank you for your help.  I don't think this is doing exactly what I
>> need, though.  The real trick here is the asynchronous nature of the
>> "server" on the other end of this situation.  I thought about using an
>> aggregator to make sure the response gets matched up with the request
>> using a correlation id.  The aggregator wouldn't aggregate multiple
>> responses together into one, it would just make sure it matches the
>> correct response with its request.  Does this sound like a valid
>> approach?  If so, how the heck do I go about it? :)
>>
>> Thanks,
>>
>> James
>>
>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <wi...@gmail.com> wrote:
>>> Hi James,
>>>
>>> Camel async process engine already provides the way that you want.
>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>
>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>
>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>
>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hz...@gmail.com>
>>>>  wrote:
>>>>>
>>>>> Hi James,
>>>>>
>>>>> I hope I understand your scenario correctly. Here are a few thoughts. I
>>>>> assume want to use camel-netty [1] to send messages to your sever (if you
>>>>> have your own code that does that, you can use it too, but you'd have to
>>>>> write your own Processor or Component). Iiuic, your scenario is converting a
>>>>> 2x in-only to a 1x in-out async mep. You should then treat your exchange as
>>>>> an async in-out and let your framework (Camel) decompose it and compose it
>>>>> back again. I would not keep threads blocked so I believe your best bet is
>>>>> using the Camel async messaging [2] and Futures (look at the examples using
>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so
>>>>> you'll need a correlationId, which you must have already and something to
>>>>> keep your state. A good bet would be jms [3], or you could write your own.
>>>>> If you used jms you would need to use both a correlationId and a replyTo
>>>>> queue.
>>>>>
>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>
>>>>
>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>> I'd like to "expose" this route via web services (using CXF of
>>>> course).  So, I would need to either block the request thread, waiting
>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>> processing stuff (I'm thinking this might help us get more done with
>>>> less http request threads) to do more of a continuation thing.
>>>>
>>>> We already have a correlation id.  The "protocol" requires one and the
>>>> server process just echos it back in the response message.
>>>>
>>>>> You may have to play a bit with the correlationId and if you cannot use
>>>>> the same you can do a second transformation/correlation using a claim-check
>>>>> sort of pattern. If you don't want to use jms you can implement your own (in
>>>>> memory) persistence and correlation. You can also use a resequencer [4] if
>>>>> you want to enforce the order. If you use asyncCallback, you get the replies
>>>>> when they become available, and you can control that.
>>>>>
>>>>
>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>> message comes in after another, but it can be processed faster, so be
>>>> it.
>>>>
>>>>> It's an interesting scenario, I'll definitely give it more thought, but I
>>>>> hope this helps.
>>>>> Hadrian
>>>>>
>>>>
>>>> You have been very helpful.  Thank you for taking the time!
>>>>
>>>
>>>
>>> --
>>> Willem
>>> ----------------------------------
>>> FuseSource
>>> Web: http://www.fusesource.com
>>> Blog:    http://willemjiang.blogspot.com (English)
>>>         http://jnn.javaeye.com (Chinese)
>>> Twitter: willemjiang
>>> Weibo: willemjiang
>>>
>

Re: Socket-based Asynchronous Calls...

Posted by Taariq Levack <ta...@gmail.com>.
Hi James

I did that too for what it's worth.
I send the message to a route that forwards to both the aggregator and to the socket. 
When the response comes in I use an enricher to add the ID to the headers and then forward to the aggregator.

Taariq

On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> wrote:

> Willem,
> 
> Thank you for your help.  I don't think this is doing exactly what I
> need, though.  The real trick here is the asynchronous nature of the
> "server" on the other end of this situation.  I thought about using an
> aggregator to make sure the response gets matched up with the request
> using a correlation id.  The aggregator wouldn't aggregate multiple
> responses together into one, it would just make sure it matches the
> correct response with its request.  Does this sound like a valid
> approach?  If so, how the heck do I go about it? :)
> 
> Thanks,
> 
> James
> 
> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <wi...@gmail.com> wrote:
>> Hi James,
>> 
>> Camel async process engine already provides the way that you want.
>> You can take a look at the camel-cxf code[1][2] for some example.
>> 
>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>> 
>> On 8/7/11 1:29 AM, James Carman wrote:
>>> 
>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hz...@gmail.com>
>>>  wrote:
>>>> 
>>>> Hi James,
>>>> 
>>>> I hope I understand your scenario correctly. Here are a few thoughts. I
>>>> assume want to use camel-netty [1] to send messages to your sever (if you
>>>> have your own code that does that, you can use it too, but you'd have to
>>>> write your own Processor or Component). Iiuic, your scenario is converting a
>>>> 2x in-only to a 1x in-out async mep. You should then treat your exchange as
>>>> an async in-out and let your framework (Camel) decompose it and compose it
>>>> back again. I would not keep threads blocked so I believe your best bet is
>>>> using the Camel async messaging [2] and Futures (look at the examples using
>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so
>>>> you'll need a correlationId, which you must have already and something to
>>>> keep your state. A good bet would be jms [3], or you could write your own.
>>>> If you used jms you would need to use both a correlationId and a replyTo
>>>> queue.
>>>> 
>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>> from("netty:input).to("jms:replyTo-queue")
>>>> 
>>> 
>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>> I'd like to "expose" this route via web services (using CXF of
>>> course).  So, I would need to either block the request thread, waiting
>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>> processing stuff (I'm thinking this might help us get more done with
>>> less http request threads) to do more of a continuation thing.
>>> 
>>> We already have a correlation id.  The "protocol" requires one and the
>>> server process just echos it back in the response message.
>>> 
>>>> You may have to play a bit with the correlationId and if you cannot use
>>>> the same you can do a second transformation/correlation using a claim-check
>>>> sort of pattern. If you don't want to use jms you can implement your own (in
>>>> memory) persistence and correlation. You can also use a resequencer [4] if
>>>> you want to enforce the order. If you use asyncCallback, you get the replies
>>>> when they become available, and you can control that.
>>>> 
>>> 
>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>> message comes in after another, but it can be processed faster, so be
>>> it.
>>> 
>>>> It's an interesting scenario, I'll definitely give it more thought, but I
>>>> hope this helps.
>>>> Hadrian
>>>> 
>>> 
>>> You have been very helpful.  Thank you for taking the time!
>>> 
>> 
>> 
>> --
>> Willem
>> ----------------------------------
>> FuseSource
>> Web: http://www.fusesource.com
>> Blog:    http://willemjiang.blogspot.com (English)
>>         http://jnn.javaeye.com (Chinese)
>> Twitter: willemjiang
>> Weibo: willemjiang
>> 

Re: Socket-based Asynchronous Calls...

Posted by James Carman <ja...@carmanconsulting.com>.
Willem,

Thank you for your help.  I don't think this is doing exactly what I
need, though.  The real trick here is the asynchronous nature of the
"server" on the other end of this situation.  I thought about using an
aggregator to make sure the response gets matched up with the request
using a correlation id.  The aggregator wouldn't aggregate multiple
responses together into one, it would just make sure it matches the
correct response with its request.  Does this sound like a valid
approach?  If so, how the heck do I go about it? :)

Thanks,

James

On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <wi...@gmail.com> wrote:
> Hi James,
>
> Camel async process engine already provides the way that you want.
> You can take a look at the camel-cxf code[1][2] for some example.
>
> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>
> On 8/7/11 1:29 AM, James Carman wrote:
>>
>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hz...@gmail.com>
>>  wrote:
>>>
>>> Hi James,
>>>
>>> I hope I understand your scenario correctly. Here are a few thoughts. I
>>> assume want to use camel-netty [1] to send messages to your sever (if you
>>> have your own code that does that, you can use it too, but you'd have to
>>> write your own Processor or Component). Iiuic, your scenario is converting a
>>> 2x in-only to a 1x in-out async mep. You should then treat your exchange as
>>> an async in-out and let your framework (Camel) decompose it and compose it
>>> back again. I would not keep threads blocked so I believe your best bet is
>>> using the Camel async messaging [2] and Futures (look at the examples using
>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so
>>> you'll need a correlationId, which you must have already and something to
>>> keep your state. A good bet would be jms [3], or you could write your own.
>>> If you used jms you would need to use both a correlationId and a replyTo
>>> queue.
>>>
>>> from("jms:request-queue").to("netty:output?=correlationId");
>>> from("netty:input).to("jms:replyTo-queue")
>>>
>>
>> Perhaps a bit more information might be appropriate here.  Eventually,
>> I'd like to "expose" this route via web services (using CXF of
>> course).  So, I would need to either block the request thread, waiting
>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>> processing stuff (I'm thinking this might help us get more done with
>> less http request threads) to do more of a continuation thing.
>>
>> We already have a correlation id.  The "protocol" requires one and the
>> server process just echos it back in the response message.
>>
>>> You may have to play a bit with the correlationId and if you cannot use
>>> the same you can do a second transformation/correlation using a claim-check
>>> sort of pattern. If you don't want to use jms you can implement your own (in
>>> memory) persistence and correlation. You can also use a resequencer [4] if
>>> you want to enforce the order. If you use asyncCallback, you get the replies
>>> when they become available, and you can control that.
>>>
>>
>> I don't think a resequencer is necessary.  I don't want to guarantee
>> the ordering.  I'm mostly interested in throughput here.  So, if a
>> message comes in after another, but it can be processed faster, so be
>> it.
>>
>>> It's an interesting scenario, I'll definitely give it more thought, but I
>>> hope this helps.
>>> Hadrian
>>>
>>
>> You have been very helpful.  Thank you for taking the time!
>>
>
>
> --
> Willem
> ----------------------------------
> FuseSource
> Web: http://www.fusesource.com
> Blog:    http://willemjiang.blogspot.com (English)
>         http://jnn.javaeye.com (Chinese)
> Twitter: willemjiang
> Weibo: willemjiang
>

Re: Socket-based Asynchronous Calls...

Posted by Willem Jiang <wi...@gmail.com>.
Hi James,

Camel async process engine already provides the way that you want.
You can take a look at the camel-cxf code[1][2] for some example.

[1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
[2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup

On 8/7/11 1:29 AM, James Carman wrote:
> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hz...@gmail.com>  wrote:
>> Hi James,
>>
>> I hope I understand your scenario correctly. Here are a few thoughts. I assume want to use camel-netty [1] to send messages to your sever (if you have your own code that does that, you can use it too, but you'd have to write your own Processor or Component). Iiuic, your scenario is converting a 2x in-only to a 1x in-out async mep. You should then treat your exchange as an async in-out and let your framework (Camel) decompose it and compose it back again. I would not keep threads blocked so I believe your best bet is using the Camel async messaging [2] and Futures (look at the examples using asyncSend* and asyncCallback*). The issue is that Camel is stateless so you'll need a correlationId, which you must have already and something to keep your state. A good bet would be jms [3], or you could write your own. If you used jms you would need to use both a correlationId and a replyTo queue.
>>
>> from("jms:request-queue").to("netty:output?=correlationId");
>> from("netty:input).to("jms:replyTo-queue")
>>
>
> Perhaps a bit more information might be appropriate here.  Eventually,
> I'd like to "expose" this route via web services (using CXF of
> course).  So, I would need to either block the request thread, waiting
> for a reply or perhaps check out the new Servlet 3.0 asynchronous
> processing stuff (I'm thinking this might help us get more done with
> less http request threads) to do more of a continuation thing.
>
> We already have a correlation id.  The "protocol" requires one and the
> server process just echos it back in the response message.
>
>> You may have to play a bit with the correlationId and if you cannot use the same you can do a second transformation/correlation using a claim-check sort of pattern. If you don't want to use jms you can implement your own (in memory) persistence and correlation. You can also use a resequencer [4] if you want to enforce the order. If you use asyncCallback, you get the replies when they become available, and you can control that.
>>
>
> I don't think a resequencer is necessary.  I don't want to guarantee
> the ordering.  I'm mostly interested in throughput here.  So, if a
> message comes in after another, but it can be processed faster, so be
> it.
>
>> It's an interesting scenario, I'll definitely give it more thought, but I hope this helps.
>> Hadrian
>>
>
> You have been very helpful.  Thank you for taking the time!
>


-- 
Willem
----------------------------------
FuseSource
Web: http://www.fusesource.com
Blog:    http://willemjiang.blogspot.com (English)
          http://jnn.javaeye.com (Chinese)
Twitter: willemjiang
Weibo: willemjiang

Re: Socket-based Asynchronous Calls...

Posted by James Carman <ja...@carmanconsulting.com>.
On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian <hz...@gmail.com> wrote:
> Hi James,
>
> I hope I understand your scenario correctly. Here are a few thoughts. I assume want to use camel-netty [1] to send messages to your sever (if you have your own code that does that, you can use it too, but you'd have to write your own Processor or Component). Iiuic, your scenario is converting a 2x in-only to a 1x in-out async mep. You should then treat your exchange as an async in-out and let your framework (Camel) decompose it and compose it back again. I would not keep threads blocked so I believe your best bet is using the Camel async messaging [2] and Futures (look at the examples using asyncSend* and asyncCallback*). The issue is that Camel is stateless so you'll need a correlationId, which you must have already and something to keep your state. A good bet would be jms [3], or you could write your own. If you used jms you would need to use both a correlationId and a replyTo queue.
>
> from("jms:request-queue").to("netty:output?=correlationId");
> from("netty:input).to("jms:replyTo-queue")
>

Perhaps a bit more information might be appropriate here.  Eventually,
I'd like to "expose" this route via web services (using CXF of
course).  So, I would need to either block the request thread, waiting
for a reply or perhaps check out the new Servlet 3.0 asynchronous
processing stuff (I'm thinking this might help us get more done with
less http request threads) to do more of a continuation thing.

We already have a correlation id.  The "protocol" requires one and the
server process just echos it back in the response message.

> You may have to play a bit with the correlationId and if you cannot use the same you can do a second transformation/correlation using a claim-check sort of pattern. If you don't want to use jms you can implement your own (in memory) persistence and correlation. You can also use a resequencer [4] if you want to enforce the order. If you use asyncCallback, you get the replies when they become available, and you can control that.
>

I don't think a resequencer is necessary.  I don't want to guarantee
the ordering.  I'm mostly interested in throughput here.  So, if a
message comes in after another, but it can be processed faster, so be
it.

> It's an interesting scenario, I'll definitely give it more thought, but I hope this helps.
> Hadrian
>

You have been very helpful.  Thank you for taking the time!

Re: Socket-based Asynchronous Calls...

Posted by Zbarcea Hadrian <hz...@gmail.com>.
Hi James,

I hope I understand your scenario correctly. Here are a few thoughts. I assume want to use camel-netty [1] to send messages to your sever (if you have your own code that does that, you can use it too, but you'd have to write your own Processor or Component). Iiuic, your scenario is converting a 2x in-only to a 1x in-out async mep. You should then treat your exchange as an async in-out and let your framework (Camel) decompose it and compose it back again. I would not keep threads blocked so I believe your best bet is using the Camel async messaging [2] and Futures (look at the examples using asyncSend* and asyncCallback*). The issue is that Camel is stateless so you'll need a correlationId, which you must have already and something to keep your state. A good bet would be jms [3], or you could write your own. If you used jms you would need to use both a correlationId and a replyTo queue.

from("jms:request-queue").to("netty:output?=correlationId");
from("netty:input).to("jms:replyTo-queue")

You may have to play a bit with the correlationId and if you cannot use the same you can do a second transformation/correlation using a claim-check sort of pattern. If you don't want to use jms you can implement your own (in memory) persistence and correlation. You can also use a resequencer [4] if you want to enforce the order. If you use asyncCallback, you get the replies when they become available, and you can control that. 

It's an interesting scenario, I'll definitely give it more thought, but I hope this helps.
Hadrian


[1] http://camel.apache.org/netty.html
[2] http://camel.apache.org/async.html
[3] http://camel.apache.org/jms.html
[4] http://camel.apache.org/resequencer.html


On Aug 6, 2011, at 8:37 AM, James Carman wrote:

> We have a server that supports a socket-based protocol.  However, it's
> not a synchronous situation.  I send a message over the output stream
> of the socket and a it goes over to the server to processed.  The
> reply message will be received at some later time on the socket's
> input stream, not necessarily in the same order they were sent.  Now,
> I'd like to turn this into a request/reply exchange using Camel.  I
> plan on using Netty to implement the socket protocol stuff.  However,
> I'm not exactly sure how to go about making the requesting threads
> wait while my server does its processing.  It appears that Camel
> already has stuff built-in to handle this.  I'm just having trouble
> setting up the route properly.