You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@cxf.apache.org by Sergey Beryozkin <se...@progress.com> on 2008/10/24 15:21:15 UTC

Jetty Continuations in CXF

Hi

I'd like to continue the discussion on how to handle Jetty continuations[1] in CXF[2] here.

In short the requirement is for CXF to be able to handle the application code (ServiceMix JBI consumers served by ServiceMix CXF BindingComponent in this case) doing explicit continuations.

Ex. CXF receives a request on a Jetty thread, creates an exchange and sends it further along to the consumer. Consumer is about to do some work so it spawns some activity or checks for some event and then does continuation.suspend(). This results in a specific Runtime exception being thrown. 

The challenge is how to 'suspend' the interception chain, let this exception propagate down to the Jetty stack so that it can free the thread and put this pending request in its internal queue, and then resume it when a consumer code decides to do continuation.resume().

See [3] for a more detailed description of the issues.

Dan, here're some comments :

1. "something would need to be done to allow the "suspend" exception thing to propogate up, 
but without taking a jetty dependency into the core."

I guess the basic thing we can do is to check the class name of the exception (like exception.getClass().equals("JettyException")), and if it matches the expected name then we can wrap up this exception in a SuspendedFault exception, to be recognized by the rest of CXF runtime

2. Now, if the above can be figured out, the next problem arises: when 
the "trigger" to wake up the continuation occurs

I think we can can do in JettyDestination omething similar to what is done in SMX. When getting a SuspendedFault exception, we can extract from it the original continuation instance or else we can do ContinuationSupport.getContinuation(request) which should return us the instance. At this point we can use it as a ket to store the current exchange plus all the other info we may need.

When the user/application code does continuation.resume(), the Jetty thread will come back and we will use the ContinuationSupport.getContinuation(request) to get us the active continuation and use it to extract the suspended exchange and proceed from there, say we'll call PhaseInterceptorPhase.resume(), etc, something along the lines you suggested

 
3. Basically, to do this "right", we'd need to audit pretty much everything to 
make sure nothing is stored on the stack and is "resumable". Once that is 
done, the rest is relatively easy. 

Yea - probably can be the quite challenging


Thoughts ?

Cheers, Sergey




[1] http://docs.codehaus.org/display/JETTY/Continuations
[2] https://issues.apache.org/jira/browse/CXF-1835
[3] https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=12642361#action_12642361

Re: Jetty Continuations in CXF

Posted by Daniel Kulp <dk...@apache.org>.
On Friday 24 October 2008 9:46:14 am Guillaume Nodet wrote:
> Btw, is the JMS transport able to not block a thread too ?  

With my commits last week, it can on the client side.   Not on the server side 
though.

> Given the 
> very nature of JMS, this should be even more feasible than for HTTP.

Right.  Which is why I want the API's for this to not be specific to Jetty.

Dan


> On Fri, Oct 24, 2008 at 3:42 PM, Daniel Kulp <dk...@apache.org> wrote:
> > On Friday 24 October 2008 9:27:01 am Sergey Beryozkin wrote:
> >> Dan, here're some comments :
> >>
> >> 1. "something would need to be done to allow the "suspend" exception
> >> thing to propogate up, but without taking a jetty dependency into the
> >> core."
> >>
> >> I guess the basic thing we can do is to check the class name of the
> >> exception (like exception.getClass().equals("JettyException")), and if
> >> it matches the expected name then we can wrap up this exception in a
> >> SuspendedFault exception, to be recognized by the rest of CXF runtime
> >
> > No.   We don't want that.   Whatever we do should work for other
> > transports as well like JMS.  Thus, this shouldn't be tied to jetty
> > continuations directly.
> >
> > Most likely, we could add a "suspend()" method to PhaseInterceptorChain
> > that would do something very similar and throw a "SuspendException" or
> > something in the same package as PhaseInterceptorChain.   That would get
> > propogated back to the JettyDestination that could then call the jetty
> > things.   The JMS transport could just catch it and more or less ignore
> > it.    We'd then have to add a "resume()" method to the chain which would
> > call back onto a listener that the transport provides.   Jetty would just
> > call the jetty resume stuff. JMS would probably put a runnable on the
> > workqueue to restart the chain.
> >
> > Also, suspend() would need to check if there is a listener.  If not, it
> > should not throw the exception.   Thus, the servlet transport and CORBA
> > stuff that couldn't do this would pretty much just ignore it.
> >
> > Basically, this needs to be done in such a way that it CAN work for the
> > non-jetty cases.   However, it also needs to be done in a way that
> > doesn't affect existing transports.
> >
> > Dan
> >
> >> 2. Now, if the above can be figured out, the next problem arises: when
> >> the "trigger" to wake up the continuation occurs
> >>
> >> I think we can can do in JettyDestination omething similar to what is
> >> done in SMX. When getting a SuspendedFault exception, we can extract
> >> from it the original continuation instance or else we can do
> >> ContinuationSupport.getContinuation(request) which should return us the
> >> instance. At this point we can use it as a ket to store the current
> >> exchange plus all the other info we may need.
> >>
> >> When the user/application code does continuation.resume(), the Jetty
> >> thread will come back and we will use the
> >> ContinuationSupport.getContinuation(request) to get us the active
> >> continuation and use it to extract the suspended exchange and proceed
> >> from there, say we'll call PhaseInterceptorPhase.resume(), etc,
> >> something along the lines you suggested
> >>
> >>
> >> 3. Basically, to do this "right", we'd need to audit pretty much
> >> everything to make sure nothing is stored on the stack and is
> >> "resumable". Once that is done, the rest is relatively easy.
> >>
> >> Yea - probably can be the quite challenging
> >>
> >>
> >> Thoughts ?
> >>
> >> Cheers, Sergey
> >>
> >>
> >>
> >>
> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
> >> [2] https://issues.apache.org/jira/browse/CXF-1835
> >> [3]
> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=12642361
> >>#ac tion_12642361
> >
> > --
> > Daniel Kulp
> > dkulp@apache.org
> > http://dankulp.com/blog



-- 
Daniel Kulp
dkulp@apache.org
http://dankulp.com/blog

Re: Jetty Continuations in CXF

Posted by Sergey Beryozkin <se...@progress.com>.
Hi

>
> No.   I actually expect this to be more important for the JMS folks than the
> HTTP folks which is why it needs to be transport independent.   Basically,
> MOST HTTP users expect a fairly synchronous invokation path.   That's pretty
> much how its "always been" so people using HTTP, unless they specifically
> know about the jetty continuations, wouldn't even think about it.    JMS
> folks on the other hand are much more in tuned to the ideas around
> asynchronous communication and events.

CXF-1835 reads :

"Use Jetty Continuations to implement asynchronous HTTP processing"

Thus I believe that a user who opened this JIRA is after using it with HTTP.

>
> We also promote the idea that the same "impl" can be used on multiple
> transports/bindings.   Thus, the continuation code needs to be abstracted out
> to hide the underlying transport.

I don't object to a transport independence - but I'd argue it's not something which we should focus upon first, as far as this 
specific JIRA is concerned. Lets solve the actual problem the user raised rather than try to come up wuth a universal solution.

OK, just would like to check - ate you thinking of us introducing our own ContinuationSupport and Continuation interfaces ? I'm not 
sure it's the right approach - but i may be wrong

>
> Also, I've started looking into the MINA based HTTP stuff (mostly for Async on
> the client side, but also as a possible Jetty replacement).   Thus, this
> needs to be abstracted out so that if we do replace Jetty, existing code
> would still work.
>
>
>> I think we have two scenarious to look at. First one is when a CXF acts as
>> a request provider for some ServiceMix components (implicitly serving as
>> 'application code') - which is what CXF-1835 is mostly about.
>
> Right.  Which can be JMS or HTTP or CORBA or .....     The ServiceMix
> components pretty much do exactly what an Impl would do except they have
> direct access to the message instead of the WebServiceContext.   Thus, either
> way, we need to abstract it out a bit.

It can be but the user is after doing it with HTTP.

>
>
>> Another one is about CXF service application code doing Continuations
>>
>> I think it does - but I'd just like to start with just HTTP in mind, just
>> to get going and consider a transport portability issue at the the next
>> stage.
>
> I'd have to say -1 to that.   Or at least it doesn't get merged up into any of
> the 2.1.x/2.0.x branches until it's completely abstracted.   Once it gets
> merged up and potentially released, we need to support it as is relatively
> indefinitely and I don't want to support a "half baked" solution that is tied
> directly to a particular http implementation.

No worries. Let me have the test working first. Furthermore, I don't share your concern about us supporting something
indefinitely in this case. For this specific JIRA there's no urgent need to introduce any new interface rather it's about some 
internal modifications - hence no need to support public interfaces.

This individual JIRA is not about user explictly interacting with continuations (scenario1 as I described). What you suggest 
(abstracting it all) is about a user interacting explictly with continuations (scenario2). As I said - I don't object to it - but 
I'd be surprised if we bundled both requirements into a single JIRA.

Lets open another one (letting users do continuation support in a transport-portable way) and look at it independently - IMHO it 
would be a faster and better way to tackle the whole continuations thing

Just my 2c
Sergey

>
>
> -- 
> Daniel Kulp
> dkulp@apache.org
> http://dankulp.com/blog 


Re: Jetty Continuations in CXF

Posted by Daniel Kulp <dk...@apache.org>.
On Monday 03 November 2008 9:41:59 am Sergey Beryozkin wrote:
> Hi,
>
>
> It's an interesting idea. Worth having it in mind. However, I'm thinking,
> how reasonable it is to expect that a user would want to write a
> Continuations code portable across multiple transports ? I'd imagine that a 
> user which wishes to do explicit continuations would do them with HTTP
> transport in mind, well, at least now that Jetty Continuations are
> available, 

No.   I actually expect this to be more important for the JMS folks than the 
HTTP folks which is why it needs to be transport independent.   Basically, 
MOST HTTP users expect a fairly synchronous invokation path.   That's pretty 
much how its "always been" so people using HTTP, unless they specifically 
know about the jetty continuations, wouldn't even think about it.    JMS 
folks on the other hand are much more in tuned to the ideas around 
asynchronous communication and events.    

> with Servlet 3.0 supporting suspended invocations too. Otherwise 
> we'd need to come up with our own ContiniationsSupport and Continuation
> classes - that's why would user use JettyContiations support and expect ths
> code work say with JMS or indeed with some other transport other than HTTP
> ?

We also promote the idea that the same "impl" can be used on multiple 
transports/bindings.   Thus, the continuation code needs to be abstracted out 
to hide the underlying transport. 

Also, I've started looking into the MINA based HTTP stuff (mostly for Async on 
the client side, but also as a possible Jetty replacement).   Thus, this 
needs to be abstracted out so that if we do replace Jetty, existing code 
would still work.


> I think we have two scenarious to look at. First one is when a CXF acts as
> a request provider for some ServiceMix components (implicitly serving as
> 'application code') - which is what CXF-1835 is mostly about.

Right.  Which can be JMS or HTTP or CORBA or .....     The ServiceMix 
components pretty much do exactly what an Impl would do except they have 
direct access to the message instead of the WebServiceContext.   Thus, either 
way, we need to abstract it out a bit.


> Another one is about CXF service application code doing Continuations
> 
> I think it does - but I'd just like to start with just HTTP in mind, just
> to get going and consider a transport portability issue at the the next
> stage.

I'd have to say -1 to that.   Or at least it doesn't get merged up into any of 
the 2.1.x/2.0.x branches until it's completely abstracted.   Once it gets 
merged up and potentially released, we need to support it as is relatively 
indefinitely and I don't want to support a "half baked" solution that is tied 
directly to a particular http implementation.  


-- 
Daniel Kulp
dkulp@apache.org
http://dankulp.com/blog

Re: NPE in system JMS test

Posted by Daniel Kulp <dk...@apache.org>.
Yea, I think it's caused by the move from Spring 2.5.4 to 2.5.6.    The Spring 
JMS stuff doesn't seem to tolerate an ungraceful shutdown which is what I 
think is happening.   I want to dig in a bit furthur and possibly log a bug 
with Spring, just haven't had the time.

Dan


On Tuesday 11 November 2008 1:15:43 pm Sergey Beryozkin wrote:
> Hi, seeing this NPE with the latest trunk :
>
>  T E S T S
> -------------------------------------------------------
> Running
> org.apache.cxf.systest.multitransport.MultiTransportClientServerTest
> Exception in thread "DefaultMessageListenerContainer-1"
> java.lang.NullPointerExc eption
>         at java.lang.String.indexOf(String.java:1564)
>         at java.lang.String.indexOf(String.java:1546)
>         at
> org.springframework.jms.support.JmsUtils.buildExceptionMessage(JmsUti
> ls.java:255)
>         at
> org.springframework.jms.listener.DefaultMessageListenerContainer.refr
> eshConnectionUntilSuccessful(DefaultMessageListenerContainer.java:799) at
> org.springframework.jms.listener.DefaultMessageListenerContainer.reco
> verAfterListenerSetupFailure(DefaultMessageListenerContainer.java:767) at
> org.springframework.jms.listener.DefaultMessageListenerContainer$Asyn
> cMessageListenerInvoker.run(DefaultMessageListenerContainer.java:898) at
> java.lang.Thread.run(Thread.java:595)
> Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 22.425 sec
>
>
> looks like it's swallowed - can someone else see it ?
>
> Sergey
>
>
> ----- Original Message -----
> From: "Sergey Beryozkin" <se...@progress.com>
> To: "Daniel Kulp" <dk...@apache.org>; <de...@cxf.apache.org>
> Sent: Tuesday, November 11, 2008 5:51 PM
> Subject: Re: Jetty Continuations in CXF
>
> > Hi
> >
> >>> I have 10 threads involved, 5 control ones + 5 application ones, I see
> >>> a loss of message approximately once in 5 cases. The fact that
> >>> cont.resume() is done virtually immediately after cont.suspend() can
> >>> explain it.
> >>
> >> Without seeing your code, I cannot really offer valid suggestions, but
> >> I'll try....   :-)
> >
> > I guess having it all on a branch would be handy then :-)
> >
> >> One thought was in the Continuation object, record if "resume()" has
> >> been called and if it's been callled by the time the stack unwinds back
> >> into the Http transport, just re-dispatch immediately.   Either that or
> >> have the resume block until the http transport sets a "ready to resume"
> >> flag just before it allows the exception to flow back into jetty.
> >
> > I have 2 tests.
> >
> > In one test an application server code interacts with a wrapper, both
> > when getting a continuation instance and when calling suspend/resume on
> > it (as suggested by yourself earlier in this thread). In this case, under
> > the hood, an inbound message is associated with a continuation instance
> > before suspend() is called on it. Thus even if the resulting exception
> > does not reach Jetty Destination in time before continuation.resume() is
> > called by a control thread, the message is not lost when the HTTP request
> > is resumed as that HTTP request had this continuation instance associated
> > with it at a time ContinuationsSupport.getContinuations(request) was
> > called.
> >
> > In other test which I believe represents an integration scenario with SMX
> > better, an application server code calls Jetty
> > ContinuationsSupport.getContinuations(request) followed by
> > continuation.suspend(). Now, in this case, before a (Jetty RetryRequest)
> > runtime exception reaches a catch block in AbstractInvoker (where I try
> > to associate a message with continuation), one or two control threads
> > manage to squeeze in and call resume() before catch block has even been
> > processed. So by the time the wrapped exception reaches JettyDestination
> > a request with a resumed continuation has already come back...
> >
> > Does this explanation for a second case and the associated race condition
> > sounds reasonable ?
> >
> > Cheers, Sergey
> >
> >> Dan
> >>
> >>> Cheers, Sergey
> >>>
> >>> > That said, I'm now trying to inject a message as a custom
> >>> > continuation object (while preserving the original one if any, both
> >>> > ways) as early as possible, in AbstractInvoker, so the time window at
> >>> > which the race condition I talked about earlier can cause the loss of
> >>> > the original message, is extremely small the time it taked for the
> >>> > continuation.suspend() exception to reach a catch block in
> >>> > AbstractInvoker.
> >>> >
> >>> > Cheers, Sergey
> >>> >
> >>> >> Hi,
> >>> >>
> >>> >> I did some system testing with Jetty continuations and it's going
> >>> >> not too bad. Here's one issue which I've encountered which might or
> >>> >> might not be a problem in cases where continuations are ustilized
> >>> >> directly (that is without our wrappers), as in case of say
> >>> >> ServiceMix CXF binding component.
> >>> >>
> >>> >> The problem is that when continuation.suspend(timeout) has been
> >>> >> called, a resulting RuntimeException might not reach CXF
> >>> >> JettyDestination (such that the original message with its phase
> >>> >> chain can be preserved until the request is resumed) if some other
> >>> >> application thread calls continuation.resume() or continuation
> >>> >> suspend timeout expires.
> >>> >>
> >>> >> In case of ServiceMix the latter is a theoretical possibility at the
> >>> >> least. I can see in its code this timeout is configured, but if this
> >>> >> timeout is in the region of up to 1 sec or so then it's feasible
> >>> >> that with a heavy  workload the race condition described above might
> >>> >> come to life.
> >>> >>
> >>> >> That said, as part of my test, I found that even when such condition
> >>> >> occurs, the 'worst' thing which can happen is that a new message and
> >>> >> a new chain are created, that is, the request is not resumed from a
> >>> >> 'suspended' ServiceInvokerInterceptor, but starts as if it was a new
> >>> >> request alltogether, but it all works nonetheless, as all the stack
> >>> >> variables used in various interceptors in my given test at least are
> >>> >> all obtained from a message. The only downside is that that the work
> >>> >> which has already been done earlier as part of handling the
> >>> >> suspended request is repeated again by the interceptors. It can
> >>> >> cause issues though in cases when some interceptors have sideeffects
> >>> >> as part of handling a given input request, say modify a db, etc
> >>> >>
> >>> >> Now, this race condition can be safely avoided if a wrapper proposed
> >>> >> by Dan is used by a server application code as the message can be
> >>> >> preserved immediately at a point a user calls suspend on our
> >>> >> wrapper, so without further doubts I've prototyped it too. It's not
> >>> >> possible for SMX components though
> >>> >>
> >>> >> Comments ?
> >>> >>
> >>> >> Cheers, Sergey
> >>> >>
> >>> >>> I guess my thinking was to tie the continutations directly to the
> >>> >>> PhaseInterceptorChain (since that is going to need to know about
> >>> >>> them anyway).   However, I suppose it could easily be done with a
> >>> >>> new interface. Probably the best thing to do is to stub out a
> >>> >>> sample usecase.   So here goes.....
> >>> >>>
> >>> >>> Lets take a "GreetMe" web service that in the greetMe method will
> >>> >>> call off asynchrously to some JMS service to actually get the
> >>> >>> result.
> >>> >>>
> >>> >>> @Resource(name = "jmsClient")
> >>> >>> Greeter jmsGreeter
> >>> >>> @Resource
> >>> >>> WebServiceContext context;
> >>> >>> public String greetMe(String arg) {
> >>> >>>     ContinuationSupport contSupport = (ContinuationSupport)
> >>> >>>              context.get(ContinuationSupport.class.getName());
> >>> >>>     if (contSupport == null) {
> >>> >>>          //continuations not supported, must wait
> >>> >>>          return jmsGreeter.greetMe(arg);
> >>> >>>     }
> >>> >>>     Continuation cont = contSupport.getContinuation();
> >>> >>>     if (cont.isResumed()) {
> >>> >>> AsyncHandler<GreetMeResponse> handler = cont.getObject();
> >>> >>>        return handler.get().getReturn();
> >>> >>>     } else {
> >>> >>>         AsyncHandler<GreetMeResponse> handler = new Handler(cont);
> >>> >>>         jmsGreeter.greetMeAsync(arg, handler);
> >>> >>>         cont.suspend(handler);
> >>> >>> return null;   //won't actually get here as suspend will throw a
> >>> >>> ContinuationException
> >>> >>>     }
> >>> >>> }
> >>> >>>
> >>> >>> The Handler would look something like:
> >>> >>> class Handler implements AsyncHandler<GreetMeResponse> {
> >>> >>> GreetMeResponse resp;
> >>> >>>        Continuation cont;
> >>> >>> public Handler(Continuation cont) {
> >>> >>>            this.cont = cont;
> >>> >>>        }
> >>> >>>        public void handleResponse(Response<GreetMeLaterResponse>
> >>> >>> response) { resp = response.get();
> >>> >>>              cont.resume();
> >>> >>>       }
> >>> >>> }
> >>> >>>
> >>> >>> Basically, the HTTP/Jetty transport could provide an implementation
> >>> >>> of ContinuationSupport that wrappers the jetty stuff.    JMS could
> >>> >>> provide one that's pretty much a null op.   Transports that cannot
> >>> >>> support it (like servlet) just wouldn't provide an implementation.
> >>> >>>
> >>> >>>
> >>> >>> Does that make sense?   Other ideas?
> >>> >>>
> >>> >>> Dan
> >>> >>>
> >>> >>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
> >>> >>>> > No.   We don't want that.   Whatever we do should work for other
> >>> >>>> > transports as well like JMS.  Thus, this shouldn't be tied to
> >>> >>>> > jetty continuations directly.
> >>> >>>>
> >>> >>>> No, I'm not suggesting to tie it up to jetty continuations.
> >>> >>>> Ex.
> >>> >>>>
> >>> >>>> try {
> >>> >>>>   invoke(); // continuation.suspend() somehow by the code being
> >>> >>>> invoked upon }
> >>> >>>> catch (RuntimeException ex) {
> >>> >>>>
> >>> >>>> if
> >>> >>>> (ex.getClass().getName().equals("jetty.JettyContinuationException"
> >>> >>>>)) throw new SuspendedFault(ex);
> >>> >>>>     // or PhaseInterceptorChain.suspend()
> >>> >>>> }
> >>> >>>> }
> >>> >>>>
> >>> >>>> > Most likely, we could add a "suspend()" method to
> >>> >>>> > PhaseInterceptorChain that would do something very similar and
> >>> >>>> > throw a "SuspendException" or something in the same package as
> >>> >>>> > PhaseInterceptorChain.
> >>> >>>>
> >>> >>>> When do we trigger this PhaseInterceptorChain.suspend() call
> >>> >>>> though ?
> >>> >>>>
> >>> >>>> >   That would get propogated
> >>> >>>> > back to the JettyDestination that could then call the jetty
> >>> >>>> > things. The JMS transport could just catch it and more or less
> >>> >>>> > ignore it. We'd then have to add a "resume()" method to the
> >>> >>>> > chain which would call back onto a listener that the transport
> >>> >>>> > provides.   Jetty would just call the jetty resume stuff. JMS
> >>> >>>> > would probably put a runnable on the workqueue to restart the
> >>> >>>> > chain.
> >>> >>>>
> >>> >>>> ok
> >>> >>>>
> >>> >>>> > Also, suspend() would need to check if there is a listener.  If
> >>> >>>> > not, it should not throw the exception.   Thus, the servlet
> >>> >>>> > transport and CORBA stuff that couldn't do this would pretty
> >>> >>>> > much just ignore it.
> >>> >>>>
> >>> >>>> ok, not sure I understand about the listener but I think I see
> >>> >>>> what you mean...
> >>> >>>>
> >>> >>>> > Basically, this needs to be done in such a way that it CAN work
> >>> >>>> > for the non-jetty cases.   However, it also needs to be done in
> >>> >>>> > a way that doesn't affect existing transports.
> >>> >>>>
> >>> >>>> +1
> >>> >>>>
> >>> >>>> Cheers, Sergey
> >>> >>>>
> >>> >>>> > Dan
> >>> >>>> >
> >>> >>>> >> 2. Now, if the above can be figured out, the next problem
> >>> >>>> >> arises: when the "trigger" to wake up the continuation occurs
> >>> >>>> >>
> >>> >>>> >> I think we can can do in JettyDestination omething similar to
> >>> >>>> >> what is done in SMX. When getting a SuspendedFault exception,
> >>> >>>> >> we can extract from it the original continuation instance or
> >>> >>>> >> else we can do ContinuationSupport.getContinuation(request)
> >>> >>>> >> which should return us the instance. At this point we can use
> >>> >>>> >> it as a ket to store the current exchange plus all the other
> >>> >>>> >> info we may need.
> >>> >>>> >>
> >>> >>>> >> When the user/application code does continuation.resume(), the
> >>> >>>> >> Jetty thread will come back and we will use the
> >>> >>>> >> ContinuationSupport.getContinuation(request) to get us the
> >>> >>>> >> active continuation and use it to extract the suspended
> >>> >>>> >> exchange and proceed from there, say we'll call
> >>> >>>> >> PhaseInterceptorPhase.resume(), etc, something along the lines
> >>> >>>> >> you suggested
> >>> >>>> >>
> >>> >>>> >>
> >>> >>>> >> 3. Basically, to do this "right", we'd need to audit pretty
> >>> >>>> >> much everything to make sure nothing is stored on the stack and
> >>> >>>> >> is "resumable". Once that is done, the rest is relatively easy.
> >>> >>>> >>
> >>> >>>> >> Yea - probably can be the quite challenging
> >>> >>>> >>
> >>> >>>> >>
> >>> >>>> >> Thoughts ?
> >>> >>>> >>
> >>> >>>> >> Cheers, Sergey
> >>> >>>> >>
> >>> >>>> >>
> >>> >>>> >>
> >>> >>>> >>
> >>> >>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
> >>> >>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
> >>> >>>> >> [3]
> >>> >>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId
> >>> >>>> >>=126 42361 #ac tion_12642361
> >>> >>>> >
> >>> >>>> > --
> >>> >>>> > Daniel Kulp
> >>> >>>> > dkulp@apache.org
> >>> >>>> > http://dankulp.com/blog
> >>> >>>
> >>> >>> --
> >>> >>> Daniel Kulp
> >>> >>> dkulp@apache.org
> >>> >>> http://dankulp.com/blog
> >>
> >> --
> >> Daniel Kulp
> >> dkulp@apache.org
> >> http://dankulp.com/blog



-- 
Daniel Kulp
dkulp@apache.org
http://dankulp.com/blog

Re: Jetty Continuations in CXF

Posted by Guillaume Nodet <gn...@gmail.com>.
Agreed. Especially if continuations can be supported somehow on the
JMS transport, ServiceMix should not depend on the jetty internal
continuations.

On Wed, Nov 12, 2008 at 1:08 PM, Freeman Fang <fr...@gmail.com> wrote:
> Sergey Beryozkin wrote:
>>>>
>>>> So before a suspended runtime exception reaches the nearest catch block
>>>> in
>>>> the CXF code where we can get a chance to do something to preserve the
>>>> state
>>>> of the given invocation, resume() might've alreadty occurred.
>>>
>>> That's why you need some synchronization blocks to ensure resume can
>>> not be called before suspend has been handled somehow.
>>> I would suggest wrapping the jetty continuation into something more
>>> CXF specific which could be used for other transports too.
>>
>> Yes, I think we're on the same page here. Case 2 is exactly about the user
>> code interacting with the
>> (jetty - when http is involved) continuations indirectly. But I think we
>> agree that if a user code does suspend on an underlying jetty continuation
>> object directly then a race condition leading to a 'loss' of message might
>> occur if resume() is done in parallel, virtually immediately after suspend()
>> was called.
>>
>> Question : how will SMX CXF Binding Component interact with (Jetty)
>> continuations when dealing with CXF-originated invocations ? The
>> Continuation wrappers will be available through an internal CXF input
>> Message and through JAXWS WebServiceContext (or JAXRS one later on) - will
>> CXF BC be able to get hold of such wrappers  ? If yes then I guess we have
>> no problems at all ?
>>
> Yes, I think so, get continuation from cxf message
> (org.apache.cxf.message.Message) is fine for CxfBcConsumer.
>>>
>>> Whoever calls the suspend() does not really matter here.
>>> The problem is that suspending the continuation will trigger a timer in
>>> jetty.
>>> When this timer elapse, the request will be replayed with a flag on
>>> the continuation
>>> saying it has been timed out somehow (we check the cont.isPending()
>>> flag in smx).
>>> At this point, you need to ensure that the timeout will be handled
>>> correctly both on the
>>> http response side, and on the cxf message side.
>>>
>>
>> I'm feeling silly :-) as I don't get it.
>>
>> Input :
>>
>> 1. What CXF JettyDestination can do if it finds out that a continuation
>> with say 'resumed' status, which is what I see in case1 as I described
>> earlier has no message associated with it ?
>>
>> 2. Why it should care about the suspended/resumed status of the
>> continuation when its *isNew()* call returns *false* ?
>> If it has a message attached to it then what difference would it make for
>> the resumed invocation whether this invocation has been resumed due to that
>> continuation being resumed explicitly or timed out ? I honestly see no
>> difference
>>
>> Output :
>>
>> I don't so anything at all on the Jetty Destination output, as far as
>> continuations are concerned. Some code down the line calls cont.resume() or
>> this continuation expired, in both cases the request is retried by Jetty
>> (that's the inbound path). If the existing message is found on the incoming
>> http request's continuatiuon - we resume a paused invocation otherwise we
>> start a new one. As I said - we might throw an exception at this point if we
>> find that the not-new continuation has no message attached to it - logging a
>> warning for now.
>>
>> Either way, eventually this invocation returns. Why should we do anything
>> about the fact at this stage that this invocation was 'resumed' by the timer
>> expiring ?
>>
>> Am I totally slow ?
>>
>> Cheers, Sergey
>>
>
>



-- 
Cheers,
Guillaume Nodet
------------------------
Blog: http://gnodet.blogspot.com/
------------------------
Open Source SOA
http://fusesource.com

Re: Jetty Continuations in CXF

Posted by Daniel Kulp <dk...@apache.org>.
Guillaume,

On Friday 14 November 2008 6:53:59 am Guillaume Nodet wrote:
> The probem with this approach is that you leave the burden to the user
> to maintain two different piece of codes.
> One that will support synchronous invocations and another one that
> will support asynchronous invocations.

The main reason I made it that way was to make it compatible with transports 
that haven't been updated to provide the continuation API's.   We can update 
the transports we have in tree, but for custom transports and such that are 
outside of our tree, they may not be updated.

Also, I really think the user should be able to tell if a continuation would 
really work or not and optimize things appropriately.   Instead of having 
a "worker thread" or thread pool doing async work and  the transport thread 
blocked, they could just do the work on the same thread and avoid the context 
switches and such.   Could keep thread in the thread pool open to do work for 
transports that CAN support it.

Dan


>
> On Fri, Nov 14, 2008 at 11:57 AM, Sergey Beryozkin
>
> <se...@progress.com> wrote:
> > this may be an option but i'd rather prefer users sticking to this
> > pattern as suggested by Dan earlier on for transports like Corba or
> > non-Servlet 3/not Jetty-aware HTTP ones :
> >
> > WebServiceContext context;
> > public String greetMe(String arg) {
> >    ContinuationSupport contSupport = (ContinuationSupport)
> >             context.get(ContinuationSupport.class.getName());
> >    if (contSupport == null) {
> >         //continuations not supported, must wait
> >         return jmsGreeter.greetMe(arg);
> >    }
> >    ....
> > }
> >
> >
> > In other words if a transport is not 'natively' async or can not handle
> > suspended requests like Jetty/servlet3 aware HTTP transport can, then we
> > won't even provide any ContinuationSupport wrappers...
> >
> > Cheers, Sergey
> >
> >> We may want the continuation support to work in both case.  This is
> >> the case in jetty, and we may want to mimic the same kind of behavior
> >> in case some transports do not support continuations.
> >> In such a case, the continuation simply blocks and waits for a call to
> >> resume(), in which case, the thread is unlocked and the processing is
> >> resumed.
> >>
> >> On Thu, Nov 13, 2008 at 12:06 PM, Sergey Beryozkin
> >>
> >> <se...@progress.com> wrote:
> >>> I've added a simple HTTPS test and with HTTPS we have no luck at the
> >>> moment,
> >>>
> >>> CxfJettySslSocketConnector extends Jetty SslSocketConnector and calling
> >>> continuation.suspend(timeout) simply blocks the calling thread.
> >>>
> >>> I've checked the archives and I believe at a time
> >>> SslSelectChannelConnector[1] was considered unstable so instead a
> >>> blocking
> >>> SslSocketConnector was picked up.
> >>>
> >>> Any thoughts on what would it take to upgrade
> >>> CxfJettySslSocketConnector to
> >>> use SslSelectChannelConnector ?
> >>>
> >>> Thanks, Segey
> >>>
> >>> [1]
> >>>
> >>> http://www.mortbay.org/jetty/jetty-6/apidocs/org/mortbay/jetty/security
> >>>/SslSelectChannelConnector.html
> >>>
> >>>>>> Question : how will SMX CXF Binding Component interact with (Jetty)
> >>>>>> continuations when dealing with CXF-originated invocations ? The
> >>>>>> Continuation wrappers will be available through an internal CXF
> >>>>>> input Message and through JAXWS WebServiceContext (or JAXRS one
> >>>>>> later on) - will
> >>>>>> CXF BC be able to get hold of such wrappers  ? If yes then I guess
> >>>>>> we have
> >>>>>> no problems at all ?
> >>>>>
> >>>>> Yes, I think so, get continuation from cxf message
> >>>>> (org.apache.cxf.message.Message) is fine for CxfBcConsumer.
> >>>>
> >>>> super.
> >>>>
> >>>> I've just copied the relevant code only to sandbox as I didn't manage
> >>>> to create a branch :
> >>>>
> >>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations
> >>>>
> >>>> Some comments. You can see in :
> >>>>
> >>>> 1. The wrapper interfaces for ContinuationSupport and Continuation in
> >>>>
> >>>>
> >>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/api/s
> >>>>rc/main/java/org/apache/cxf/continuations
> >>>>
> >>>> 2. How PhaseInterceptorChain deals with suspended exceptions in
> >>>>
> >>>>
> >>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/api/s
> >>>>rc/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
> >>>>
> >>>> 3. How AbstractInvoker deals with suspended exceptions in
> >>>>
> >>>>
> >>>>
> >>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/co
> >>>>re/src/main/java/org/apache/cxf/service/invoker/AbstractInvoker.java
> >>>>
> >>>> 4. How ChainInitiatorObserver deals with resuming an invocation chain
> >>>> in
> >>>>
> >>>>
> >>>>
> >>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/co
> >>>>re/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
> >>>>
> >>>> 5. Jetty-specific continuation wrappers in
> >>>>
> >>>>
> >>>>
> >>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/tr
> >>>>ansports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/c
> >>>>ontinuations
> >>>>
> >>>> 6. How JettyDestination deals with continuations :
> >>>>
> >>>>
> >>>>
> >>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/tr
> >>>>ansports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/J
> >>>>ettyHTTPDestination.java
> >>>>
> >>>> 7. System tests :
> >>>>
> >>>>
> >>>>
> >>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/syste
> >>>>sts/src/test/java/org/apache/cxf/systest/jaxws/continuations
> >>>>
> >>>> See the server code on how the test code interacts with continuations
> >>>> through wrappers :
> >>>>
> >>>>
> >>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/syste
> >>>>sts/src/test/java/org/apache/cxf/systest/jaxws/continuations/HelloImplW
> >>>>ithWrapppedContinuation.java
> >>>>
> >>>> and how it does so directly
> >>>>
> >>>>
> >>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/syste
> >>>>sts/src/test/java/org/apache/cxf/systest/jaxws/continuations/HelloImplW
> >>>>ithContinuation.java
> >>>>
> >>>> check
> >>>>
> >>>>
> >>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/syste
> >>>>sts/src/test/java/org/apache/cxf/systest/jaxws/continuations/jetty-engi
> >>>>ne.xml
> >>>>
> >>>> on how a jetty engine on a specific port can be told to ignore
> >>>> continuations which are supported by default if true is set or that
> >>>> attribute is omitted (not used in the test though)
> >>>>
> >>>> Ok - you can now see it it so shoot :-) Comments are welcome
> >>>>
> >>>> Sergey
> >>
> >> --
> >> Cheers,
> >> Guillaume Nodet
> >> ------------------------
> >> Blog: http://gnodet.blogspot.com/
> >> ------------------------
> >> Open Source SOA
> >> http://fusesource.com



-- 
Daniel Kulp
dkulp@apache.org
http://dankulp.com/blog

Re: Jetty Continuations in CXF

Posted by Sergey Beryozkin <se...@progress.com>.

> The probem with this approach is that you leave the burden to the user
> to maintain two different piece of codes.
> One that will support synchronous invocations and another one that
> will support asynchronous invocations.

Well, I'd argue this burden will be negligible compared to the complexity a user would face when writing an asynchronous code in the 
first place.
Having a 2 liner code branch is not a big deal at all IMHO. A user would have to check for null anyway, right ? So this check will 
fit naturally

Cheers, Sergey



>
> On Fri, Nov 14, 2008 at 11:57 AM, Sergey Beryozkin
> <se...@progress.com> wrote:
>> this may be an option but i'd rather prefer users sticking to this pattern
>> as suggested by Dan earlier on for transports like Corba or non-Servlet
>> 3/not Jetty-aware HTTP ones :
>>
>> WebServiceContext context;
>> public String greetMe(String arg) {
>>    ContinuationSupport contSupport = (ContinuationSupport)
>>             context.get(ContinuationSupport.class.getName());
>>    if (contSupport == null) {
>>         //continuations not supported, must wait
>>         return jmsGreeter.greetMe(arg);
>>    }
>>    ....
>> }
>>
>>
>> In other words if a transport is not 'natively' async or can not handle
>> suspended requests like Jetty/servlet3 aware HTTP transport can, then we
>> won't even provide any ContinuationSupport wrappers...
>>
>> Cheers, Sergey
>>
>>
>>
>>> We may want the continuation support to work in both case.  This is
>>> the case in jetty, and we may want to mimic the same kind of behavior
>>> in case some transports do not support continuations.
>>> In such a case, the continuation simply blocks and waits for a call to
>>> resume(), in which case, the thread is unlocked and the processing is
>>> resumed.
>>>
>>> On Thu, Nov 13, 2008 at 12:06 PM, Sergey Beryozkin
>>> <se...@progress.com> wrote:
>>>>
>>>> I've added a simple HTTPS test and with HTTPS we have no luck at the
>>>> moment,
>>>>
>>>> CxfJettySslSocketConnector extends Jetty SslSocketConnector and calling
>>>> continuation.suspend(timeout) simply blocks the calling thread.
>>>>
>>>> I've checked the archives and I believe at a time
>>>> SslSelectChannelConnector[1] was considered unstable so instead a
>>>> blocking
>>>> SslSocketConnector was picked up.
>>>>
>>>> Any thoughts on what would it take to upgrade CxfJettySslSocketConnector
>>>> to
>>>> use SslSelectChannelConnector ?
>>>>
>>>> Thanks, Segey
>>>>
>>>> [1]
>>>>
>>>> http://www.mortbay.org/jetty/jetty-6/apidocs/org/mortbay/jetty/security/SslSelectChannelConnector.html
>>>>
>>>>>>>
>>>>>>> Question : how will SMX CXF Binding Component interact with (Jetty)
>>>>>>> continuations when dealing with CXF-originated invocations ? The
>>>>>>> Continuation wrappers will be available through an internal CXF input
>>>>>>> Message and through JAXWS WebServiceContext (or JAXRS one later on) -
>>>>>>> will
>>>>>>> CXF BC be able to get hold of such wrappers  ? If yes then I guess we
>>>>>>> have
>>>>>>> no problems at all ?
>>>>>>>
>>>>>> Yes, I think so, get continuation from cxf message
>>>>>> (org.apache.cxf.message.Message) is fine for CxfBcConsumer.
>>>>>
>>>>> super.
>>>>>
>>>>> I've just copied the relevant code only to sandbox as I didn't manage to
>>>>> create a branch :
>>>>>
>>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations
>>>>>
>>>>> Some comments. You can see in :
>>>>>
>>>>> 1. The wrapper interfaces for ContinuationSupport and Continuation in
>>>>>
>>>>>
>>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/api/src/main/java/org/apache/cxf/continuations
>>>>>
>>>>> 2. How PhaseInterceptorChain deals with suspended exceptions in
>>>>>
>>>>>
>>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
>>>>>
>>>>> 3. How AbstractInvoker deals with suspended exceptions in
>>>>>
>>>>>
>>>>>
>>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/core/src/main/java/org/apache/cxf/service/invoker/AbstractInvoker.java
>>>>>
>>>>> 4. How ChainInitiatorObserver deals with resuming an invocation chain in
>>>>>
>>>>>
>>>>>
>>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
>>>>>
>>>>> 5. Jetty-specific continuation wrappers in
>>>>>
>>>>>
>>>>>
>>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations
>>>>>
>>>>> 6. How JettyDestination deals with continuations :
>>>>>
>>>>>
>>>>>
>>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
>>>>>
>>>>> 7. System tests :
>>>>>
>>>>>
>>>>>
>>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations
>>>>>
>>>>> See the server code on how the test code interacts with continuations
>>>>> through wrappers :
>>>>>
>>>>>
>>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/HelloImplWithWrapppedContinuation.java
>>>>>
>>>>> and how it does so directly
>>>>>
>>>>>
>>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/HelloImplWithContinuation.java
>>>>>
>>>>> check
>>>>>
>>>>>
>>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/jetty-engine.xml
>>>>>
>>>>> on how a jetty engine on a specific port can be told to ignore
>>>>> continuations which are supported by default if true is set or that
>>>>> attribute is omitted (not used in the test though)
>>>>>
>>>>> Ok - you can now see it it so shoot :-) Comments are welcome
>>>>>
>>>>> Sergey
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Cheers,
>>> Guillaume Nodet
>>> ------------------------
>>> Blog: http://gnodet.blogspot.com/
>>> ------------------------
>>> Open Source SOA
>>> http://fusesource.com
>>>
>>
>>
>>
>
>
>
> -- 
> Cheers,
> Guillaume Nodet
> ------------------------
> Blog: http://gnodet.blogspot.com/
> ------------------------
> Open Source SOA
> http://fusesource.com
> 



Re: Jetty Continuations in CXF

Posted by Guillaume Nodet <gn...@gmail.com>.
The probem with this approach is that you leave the burden to the user
to maintain two different piece of codes.
One that will support synchronous invocations and another one that
will support asynchronous invocations.

On Fri, Nov 14, 2008 at 11:57 AM, Sergey Beryozkin
<se...@progress.com> wrote:
> this may be an option but i'd rather prefer users sticking to this pattern
> as suggested by Dan earlier on for transports like Corba or non-Servlet
> 3/not Jetty-aware HTTP ones :
>
> WebServiceContext context;
> public String greetMe(String arg) {
>    ContinuationSupport contSupport = (ContinuationSupport)
>             context.get(ContinuationSupport.class.getName());
>    if (contSupport == null) {
>         //continuations not supported, must wait
>         return jmsGreeter.greetMe(arg);
>    }
>    ....
> }
>
>
> In other words if a transport is not 'natively' async or can not handle
> suspended requests like Jetty/servlet3 aware HTTP transport can, then we
> won't even provide any ContinuationSupport wrappers...
>
> Cheers, Sergey
>
>
>
>> We may want the continuation support to work in both case.  This is
>> the case in jetty, and we may want to mimic the same kind of behavior
>> in case some transports do not support continuations.
>> In such a case, the continuation simply blocks and waits for a call to
>> resume(), in which case, the thread is unlocked and the processing is
>> resumed.
>>
>> On Thu, Nov 13, 2008 at 12:06 PM, Sergey Beryozkin
>> <se...@progress.com> wrote:
>>>
>>> I've added a simple HTTPS test and with HTTPS we have no luck at the
>>> moment,
>>>
>>> CxfJettySslSocketConnector extends Jetty SslSocketConnector and calling
>>> continuation.suspend(timeout) simply blocks the calling thread.
>>>
>>> I've checked the archives and I believe at a time
>>> SslSelectChannelConnector[1] was considered unstable so instead a
>>> blocking
>>> SslSocketConnector was picked up.
>>>
>>> Any thoughts on what would it take to upgrade CxfJettySslSocketConnector
>>> to
>>> use SslSelectChannelConnector ?
>>>
>>> Thanks, Segey
>>>
>>> [1]
>>>
>>> http://www.mortbay.org/jetty/jetty-6/apidocs/org/mortbay/jetty/security/SslSelectChannelConnector.html
>>>
>>>>>>
>>>>>> Question : how will SMX CXF Binding Component interact with (Jetty)
>>>>>> continuations when dealing with CXF-originated invocations ? The
>>>>>> Continuation wrappers will be available through an internal CXF input
>>>>>> Message and through JAXWS WebServiceContext (or JAXRS one later on) -
>>>>>> will
>>>>>> CXF BC be able to get hold of such wrappers  ? If yes then I guess we
>>>>>> have
>>>>>> no problems at all ?
>>>>>>
>>>>> Yes, I think so, get continuation from cxf message
>>>>> (org.apache.cxf.message.Message) is fine for CxfBcConsumer.
>>>>
>>>> super.
>>>>
>>>> I've just copied the relevant code only to sandbox as I didn't manage to
>>>> create a branch :
>>>>
>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations
>>>>
>>>> Some comments. You can see in :
>>>>
>>>> 1. The wrapper interfaces for ContinuationSupport and Continuation in
>>>>
>>>>
>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/api/src/main/java/org/apache/cxf/continuations
>>>>
>>>> 2. How PhaseInterceptorChain deals with suspended exceptions in
>>>>
>>>>
>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
>>>>
>>>> 3. How AbstractInvoker deals with suspended exceptions in
>>>>
>>>>
>>>>
>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/core/src/main/java/org/apache/cxf/service/invoker/AbstractInvoker.java
>>>>
>>>> 4. How ChainInitiatorObserver deals with resuming an invocation chain in
>>>>
>>>>
>>>>
>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
>>>>
>>>> 5. Jetty-specific continuation wrappers in
>>>>
>>>>
>>>>
>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations
>>>>
>>>> 6. How JettyDestination deals with continuations :
>>>>
>>>>
>>>>
>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
>>>>
>>>> 7. System tests :
>>>>
>>>>
>>>>
>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations
>>>>
>>>> See the server code on how the test code interacts with continuations
>>>> through wrappers :
>>>>
>>>>
>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/HelloImplWithWrapppedContinuation.java
>>>>
>>>> and how it does so directly
>>>>
>>>>
>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/HelloImplWithContinuation.java
>>>>
>>>> check
>>>>
>>>>
>>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/jetty-engine.xml
>>>>
>>>> on how a jetty engine on a specific port can be told to ignore
>>>> continuations which are supported by default if true is set or that
>>>> attribute is omitted (not used in the test though)
>>>>
>>>> Ok - you can now see it it so shoot :-) Comments are welcome
>>>>
>>>> Sergey
>>>>
>>>>
>>>
>>>
>>
>>
>>
>> --
>> Cheers,
>> Guillaume Nodet
>> ------------------------
>> Blog: http://gnodet.blogspot.com/
>> ------------------------
>> Open Source SOA
>> http://fusesource.com
>>
>
>
>



-- 
Cheers,
Guillaume Nodet
------------------------
Blog: http://gnodet.blogspot.com/
------------------------
Open Source SOA
http://fusesource.com

Re: Jetty Continuations in CXF

Posted by Sergey Beryozkin <se...@progress.com>.
this may be an option but i'd rather prefer users sticking to this pattern as suggested by Dan earlier on for transports like Corba 
or non-Servlet 3/not Jetty-aware HTTP ones :

WebServiceContext context;
public String greetMe(String arg) {
     ContinuationSupport contSupport = (ContinuationSupport)
              context.get(ContinuationSupport.class.getName());
     if (contSupport == null) {
          //continuations not supported, must wait
          return jmsGreeter.greetMe(arg);
     }
     ....
}


In other words if a transport is not 'natively' async or can not handle suspended requests like Jetty/servlet3 aware HTTP transport 
can, then we won't even provide any ContinuationSupport wrappers...

Cheers, Sergey



> We may want the continuation support to work in both case.  This is
> the case in jetty, and we may want to mimic the same kind of behavior
> in case some transports do not support continuations.
> In such a case, the continuation simply blocks and waits for a call to
> resume(), in which case, the thread is unlocked and the processing is
> resumed.
>
> On Thu, Nov 13, 2008 at 12:06 PM, Sergey Beryozkin
> <se...@progress.com> wrote:
>> I've added a simple HTTPS test and with HTTPS we have no luck at the moment,
>>
>> CxfJettySslSocketConnector extends Jetty SslSocketConnector and calling
>> continuation.suspend(timeout) simply blocks the calling thread.
>>
>> I've checked the archives and I believe at a time
>> SslSelectChannelConnector[1] was considered unstable so instead a blocking
>> SslSocketConnector was picked up.
>>
>> Any thoughts on what would it take to upgrade CxfJettySslSocketConnector to
>> use SslSelectChannelConnector ?
>>
>> Thanks, Segey
>>
>> [1]
>> http://www.mortbay.org/jetty/jetty-6/apidocs/org/mortbay/jetty/security/SslSelectChannelConnector.html
>>
>>>>>
>>>>> Question : how will SMX CXF Binding Component interact with (Jetty)
>>>>> continuations when dealing with CXF-originated invocations ? The
>>>>> Continuation wrappers will be available through an internal CXF input
>>>>> Message and through JAXWS WebServiceContext (or JAXRS one later on) - will
>>>>> CXF BC be able to get hold of such wrappers  ? If yes then I guess we have
>>>>> no problems at all ?
>>>>>
>>>> Yes, I think so, get continuation from cxf message
>>>> (org.apache.cxf.message.Message) is fine for CxfBcConsumer.
>>>
>>> super.
>>>
>>> I've just copied the relevant code only to sandbox as I didn't manage to
>>> create a branch :
>>>
>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations
>>>
>>> Some comments. You can see in :
>>>
>>> 1. The wrapper interfaces for ContinuationSupport and Continuation in
>>>
>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/api/src/main/java/org/apache/cxf/continuations
>>>
>>> 2. How PhaseInterceptorChain deals with suspended exceptions in
>>>
>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
>>>
>>> 3. How AbstractInvoker deals with suspended exceptions in
>>>
>>>
>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/core/src/main/java/org/apache/cxf/service/invoker/AbstractInvoker.java
>>>
>>> 4. How ChainInitiatorObserver deals with resuming an invocation chain in
>>>
>>>
>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
>>>
>>> 5. Jetty-specific continuation wrappers in
>>>
>>>
>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations
>>>
>>> 6. How JettyDestination deals with continuations :
>>>
>>>
>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
>>>
>>> 7. System tests :
>>>
>>>
>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations
>>>
>>> See the server code on how the test code interacts with continuations
>>> through wrappers :
>>>
>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/HelloImplWithWrapppedContinuation.java
>>>
>>> and how it does so directly
>>>
>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/HelloImplWithContinuation.java
>>>
>>> check
>>>
>>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/jetty-engine.xml
>>>
>>> on how a jetty engine on a specific port can be told to ignore
>>> continuations which are supported by default if true is set or that
>>> attribute is omitted (not used in the test though)
>>>
>>> Ok - you can now see it it so shoot :-) Comments are welcome
>>>
>>> Sergey
>>>
>>>
>>
>>
>
>
>
> -- 
> Cheers,
> Guillaume Nodet
> ------------------------
> Blog: http://gnodet.blogspot.com/
> ------------------------
> Open Source SOA
> http://fusesource.com
> 



Re: Jetty Continuations in CXF

Posted by Guillaume Nodet <gn...@gmail.com>.
We may want the continuation support to work in both case.  This is
the case in jetty, and we may want to mimic the same kind of behavior
in case some transports do not support continuations.
In such a case, the continuation simply blocks and waits for a call to
resume(), in which case, the thread is unlocked and the processing is
resumed.

On Thu, Nov 13, 2008 at 12:06 PM, Sergey Beryozkin
<se...@progress.com> wrote:
> I've added a simple HTTPS test and with HTTPS we have no luck at the moment,
>
> CxfJettySslSocketConnector extends Jetty SslSocketConnector and calling
> continuation.suspend(timeout) simply blocks the calling thread.
>
> I've checked the archives and I believe at a time
> SslSelectChannelConnector[1] was considered unstable so instead a blocking
> SslSocketConnector was picked up.
>
> Any thoughts on what would it take to upgrade CxfJettySslSocketConnector to
> use SslSelectChannelConnector ?
>
> Thanks, Segey
>
> [1]
> http://www.mortbay.org/jetty/jetty-6/apidocs/org/mortbay/jetty/security/SslSelectChannelConnector.html
>
>>>>
>>>> Question : how will SMX CXF Binding Component interact with (Jetty)
>>>> continuations when dealing with CXF-originated invocations ? The
>>>> Continuation wrappers will be available through an internal CXF input
>>>> Message and through JAXWS WebServiceContext (or JAXRS one later on) - will
>>>> CXF BC be able to get hold of such wrappers  ? If yes then I guess we have
>>>> no problems at all ?
>>>>
>>> Yes, I think so, get continuation from cxf message
>>> (org.apache.cxf.message.Message) is fine for CxfBcConsumer.
>>
>> super.
>>
>> I've just copied the relevant code only to sandbox as I didn't manage to
>> create a branch :
>>
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations
>>
>> Some comments. You can see in :
>>
>> 1. The wrapper interfaces for ContinuationSupport and Continuation in
>>
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/api/src/main/java/org/apache/cxf/continuations
>>
>> 2. How PhaseInterceptorChain deals with suspended exceptions in
>>
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
>>
>> 3. How AbstractInvoker deals with suspended exceptions in
>>
>>
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/core/src/main/java/org/apache/cxf/service/invoker/AbstractInvoker.java
>>
>> 4. How ChainInitiatorObserver deals with resuming an invocation chain in
>>
>>
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
>>
>> 5. Jetty-specific continuation wrappers in
>>
>>
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations
>>
>> 6. How JettyDestination deals with continuations :
>>
>>
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
>>
>> 7. System tests :
>>
>>
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations
>>
>> See the server code on how the test code interacts with continuations
>> through wrappers :
>>
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/HelloImplWithWrapppedContinuation.java
>>
>> and how it does so directly
>>
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/HelloImplWithContinuation.java
>>
>> check
>>
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/jetty-engine.xml
>>
>> on how a jetty engine on a specific port can be told to ignore
>> continuations which are supported by default if true is set or that
>> attribute is omitted (not used in the test though)
>>
>> Ok - you can now see it it so shoot :-) Comments are welcome
>>
>> Sergey
>>
>>
>
>



-- 
Cheers,
Guillaume Nodet
------------------------
Blog: http://gnodet.blogspot.com/
------------------------
Open Source SOA
http://fusesource.com

Re: Jetty Continuations in CXF

Posted by Sergey Beryozkin <se...@progress.com>.
FYI, as far as designing continuation providers/wrappers is concerned, I decided to drop a mutex parameter from

ContinuationProvider.getContinuation(Object mutex)

which in case of Jetty

maps to

class JettyContinuationProvider {
  private HttpServletRequest req;

  public ContinuationWrapper getContinuation(Object mutex) {
      return new JettyContinuationWrapper(ContinuationSupport..getContinuation(req, mutex));
  }
}

The problem with supporting user-provider mutexes is that, say in case of Jetty, Continuation  interface provides no getMutex() 
method, but when dealing with continuations internally  in CXF, we'd need to use them to synchronize an access to a given 
ContinuationWrapper/Continuation. So we'd need to keep a map of user mutexes to continuation instances in say 
JettyContinuationProvider.

So I reckon it's too much of the overhead, and CXF users, when writing continuations code will have to synchonize on 
ContinuationWrapper instances, something what ServiceMix does when dealing with Jetty continuations.

Hope it makes sense - shout please if anyone has some practical examples when disallowing user mutexes might be an issue

Cheers, Sergey




----- Original Message ----- 
From: "Sergey Beryozkin" <se...@progress.com>
To: <de...@cxf.apache.org>
Sent: Thursday, November 13, 2008 11:06 AM
Subject: Re: Jetty Continuations in CXF


> I've added a simple HTTPS test and with HTTPS we have no luck at the moment,
>
> CxfJettySslSocketConnector extends Jetty SslSocketConnector and calling continuation.suspend(timeout) simply blocks the calling 
> thread.
>
> I've checked the archives and I believe at a time SslSelectChannelConnector[1] was considered unstable so instead a blocking 
> SslSocketConnector was picked up.
>
> Any thoughts on what would it take to upgrade CxfJettySslSocketConnector to use SslSelectChannelConnector ?
>
> Thanks, Segey
>
> [1] http://www.mortbay.org/jetty/jetty-6/apidocs/org/mortbay/jetty/security/SslSelectChannelConnector.html
>
>>>>
>>>> Question : how will SMX CXF Binding Component interact with (Jetty) continuations when dealing with CXF-originated invocations 
>>>> ? The Continuation wrappers will be available through an internal CXF input Message and through JAXWS WebServiceContext (or 
>>>> JAXRS one later on) - will CXF BC be able to get hold of such wrappers  ? If yes then I guess we have no problems at all ?
>>>>
>>> Yes, I think so, get continuation from cxf message (org.apache.cxf.message.Message) is fine for CxfBcConsumer.
>>
>> super.
>>
>> I've just copied the relevant code only to sandbox as I didn't manage to create a branch :
>>
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations
>>
>> Some comments. You can see in :
>>
>> 1. The wrapper interfaces for ContinuationSupport and Continuation in
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/api/src/main/java/org/apache/cxf/continuations
>>
>> 2. How PhaseInterceptorChain deals with suspended exceptions in
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
>>
>> 3. How AbstractInvoker deals with suspended exceptions in
>>
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/core/src/main/java/org/apache/cxf/service/invoker/AbstractInvoker.java
>>
>> 4. How ChainInitiatorObserver deals with resuming an invocation chain in
>>
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
>>
>> 5. Jetty-specific continuation wrappers in
>>
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations
>>
>> 6. How JettyDestination deals with continuations :
>>
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
>>
>> 7. System tests :
>>
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations
>>
>> See the server code on how the test code interacts with continuations through wrappers :
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/HelloImplWithWrapppedContinuation.java
>>
>> and how it does so directly
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/HelloImplWithContinuation.java
>>
>> check
>> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/jetty-engine.xml
>>
>> on how a jetty engine on a specific port can be told to ignore continuations which are supported by default if true is set or 
>> that attribute is omitted (not used in the test though)
>>
>> Ok - you can now see it it so shoot :-) Comments are welcome
>>
>> Sergey
>>
>>
> 



Re: Jetty Continuations in CXF

Posted by Sergey Beryozkin <se...@progress.com>.
I've added a simple HTTPS test and with HTTPS we have no luck at the moment,

CxfJettySslSocketConnector extends Jetty SslSocketConnector and calling continuation.suspend(timeout) simply blocks the calling 
thread.

I've checked the archives and I believe at a time SslSelectChannelConnector[1] was considered unstable so instead a blocking 
SslSocketConnector was picked up.

Any thoughts on what would it take to upgrade CxfJettySslSocketConnector to use SslSelectChannelConnector ?

Thanks, Segey

[1] http://www.mortbay.org/jetty/jetty-6/apidocs/org/mortbay/jetty/security/SslSelectChannelConnector.html

>>>
>>> Question : how will SMX CXF Binding Component interact with (Jetty) continuations when dealing with CXF-originated invocations ? 
>>> The Continuation wrappers will be available through an internal CXF input Message and through JAXWS WebServiceContext (or JAXRS 
>>> one later on) - will CXF BC be able to get hold of such wrappers  ? If yes then I guess we have no problems at all ?
>>>
>> Yes, I think so, get continuation from cxf message (org.apache.cxf.message.Message) is fine for CxfBcConsumer.
>
> super.
>
> I've just copied the relevant code only to sandbox as I didn't manage to create a branch :
>
> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations
>
> Some comments. You can see in :
>
> 1. The wrapper interfaces for ContinuationSupport and Continuation in
> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/api/src/main/java/org/apache/cxf/continuations
>
> 2. How PhaseInterceptorChain deals with suspended exceptions in
> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
>
> 3. How AbstractInvoker deals with suspended exceptions in
>
> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/core/src/main/java/org/apache/cxf/service/invoker/AbstractInvoker.java
>
> 4. How ChainInitiatorObserver deals with resuming an invocation chain in
>
> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
>
> 5. Jetty-specific continuation wrappers in
>
> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations
>
> 6. How JettyDestination deals with continuations :
>
> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
>
> 7. System tests :
>
> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations
>
> See the server code on how the test code interacts with continuations through wrappers :
> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/HelloImplWithWrapppedContinuation.java
>
> and how it does so directly
> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/HelloImplWithContinuation.java
>
> check
> https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/jetty-engine.xml
>
> on how a jetty engine on a specific port can be told to ignore continuations which are supported by default if true is set or that 
> attribute is omitted (not used in the test though)
>
> Ok - you can now see it it so shoot :-) Comments are welcome
>
> Sergey
>
> 


Re: Jetty Continuations in CXF

Posted by Sergey Beryozkin <se...@progress.com>.
Hi,
>>
>> Question : how will SMX CXF Binding Component interact with (Jetty) continuations when dealing with CXF-originated invocations ? 
>> The Continuation wrappers will be available through an internal CXF input Message and through JAXWS WebServiceContext (or JAXRS 
>> one later on) - will CXF BC be able to get hold of such wrappers  ? If yes then I guess we have no problems at all ?
>>
> Yes, I think so, get continuation from cxf message (org.apache.cxf.message.Message) is fine for CxfBcConsumer.

super.

I've just copied the relevant code only to sandbox as I didn't manage to create a branch :

https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations

Some comments. You can see in :

1. The wrapper interfaces for ContinuationSupport and Continuation in
https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/api/src/main/java/org/apache/cxf/continuations

2. How PhaseInterceptorChain deals with suspended exceptions in
https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java

3. How AbstractInvoker deals with suspended exceptions in

https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/core/src/main/java/org/apache/cxf/service/invoker/AbstractInvoker.java

4. How ChainInitiatorObserver deals with resuming an invocation chain in

https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java

5. Jetty-specific continuation wrappers in

https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations

6. How JettyDestination deals with continuations :

https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java

7. System tests :

https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations

See the server code on how the test code interacts with continuations through wrappers :
https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/HelloImplWithWrapppedContinuation.java

and how it does so directly
https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/HelloImplWithContinuation.java

check
https://svn.apache.org/repos/asf/cxf/sandbox/2.2.x-continuations/systests/src/test/java/org/apache/cxf/systest/jaxws/continuations/jetty-engine.xml

on how a jetty engine on a specific port can be told to ignore continuations which are supported by default if true is set or that 
attribute is omitted (not used in the test though)

Ok - you can now see it it so shoot :-) Comments are welcome

Sergey



Re: Jetty Continuations in CXF

Posted by Freeman Fang <fr...@gmail.com>.
Sergey Beryozkin wrote:
>>>
>>> So before a suspended runtime exception reaches the nearest catch 
>>> block in
>>> the CXF code where we can get a chance to do something to preserve 
>>> the state
>>> of the given invocation, resume() might've alreadty occurred.
>>
>> That's why you need some synchronization blocks to ensure resume can
>> not be called before suspend has been handled somehow.
>> I would suggest wrapping the jetty continuation into something more
>> CXF specific which could be used for other transports too.
>
> Yes, I think we're on the same page here. Case 2 is exactly about the 
> user code interacting with the
> (jetty - when http is involved) continuations indirectly. But I think 
> we agree that if a user code does suspend on an underlying jetty 
> continuation object directly then a race condition leading to a 'loss' 
> of message might occur if resume() is done in parallel, virtually 
> immediately after suspend() was called.
>
> Question : how will SMX CXF Binding Component interact with (Jetty) 
> continuations when dealing with CXF-originated invocations ? The 
> Continuation wrappers will be available through an internal CXF input 
> Message and through JAXWS WebServiceContext (or JAXRS one later on) - 
> will CXF BC be able to get hold of such wrappers  ? If yes then I 
> guess we have no problems at all ?
>
Yes, I think so, get continuation from cxf message 
(org.apache.cxf.message.Message) is fine for CxfBcConsumer.
>>
>> Whoever calls the suspend() does not really matter here.
>> The problem is that suspending the continuation will trigger a timer 
>> in jetty.
>> When this timer elapse, the request will be replayed with a flag on
>> the continuation
>> saying it has been timed out somehow (we check the cont.isPending()
>> flag in smx).
>> At this point, you need to ensure that the timeout will be handled
>> correctly both on the
>> http response side, and on the cxf message side.
>>
>
> I'm feeling silly :-) as I don't get it.
>
> Input :
>
> 1. What CXF JettyDestination can do if it finds out that a 
> continuation with say 'resumed' status, which is what I see in case1 
> as I described earlier has no message associated with it ?
>
> 2. Why it should care about the suspended/resumed status of the 
> continuation when its *isNew()* call returns *false* ?
> If it has a message attached to it then what difference would it make 
> for the resumed invocation whether this invocation has been resumed 
> due to that continuation being resumed explicitly or timed out ? I 
> honestly see no difference
>
> Output :
>
> I don't so anything at all on the Jetty Destination output, as far as 
> continuations are concerned. Some code down the line calls 
> cont.resume() or this continuation expired, in both cases the request 
> is retried by Jetty (that's the inbound path). If the existing message 
> is found on the incoming http request's continuatiuon - we resume a 
> paused invocation otherwise we start a new one. As I said - we might 
> throw an exception at this point if we find that the not-new 
> continuation has no message attached to it - logging a warning for now.
>
> Either way, eventually this invocation returns. Why should we do 
> anything about the fact at this stage that this invocation was 
> 'resumed' by the timer expiring ?
>
> Am I totally slow ?
>
> Cheers, Sergey
>


Re: Jetty Continuations in CXF

Posted by Sergey Beryozkin <se...@progress.com>.
>>
>> So before a suspended runtime exception reaches the nearest catch block in
>> the CXF code where we can get a chance to do something to preserve the state
>> of the given invocation, resume() might've alreadty occurred.
>
> That's why you need some synchronization blocks to ensure resume can
> not be called before suspend has been handled somehow.
> I would suggest wrapping the jetty continuation into something more
> CXF specific which could be used for other transports too.

Yes, I think we're on the same page here. Case 2 is exactly about the user code interacting with the
(jetty - when http is involved) continuations indirectly. But I think we agree that if a user code does suspend on an underlying 
jetty continuation object directly then a race condition leading to a 'loss' of message might occur if resume() is done in parallel, 
virtually immediately after suspend() was called.

Question : how will SMX CXF Binding Component interact with (Jetty) continuations when dealing with CXF-originated invocations ? The 
Continuation wrappers will be available through an internal CXF input Message and through JAXWS WebServiceContext (or JAXRS one 
later on) - will CXF BC be able to get hold of such wrappers  ? If yes then I guess we have no problems at all ?

>
> Whoever calls the suspend() does not really matter here.
> The problem is that suspending the continuation will trigger a timer in jetty.
> When this timer elapse, the request will be replayed with a flag on
> the continuation
> saying it has been timed out somehow (we check the cont.isPending()
> flag in smx).
> At this point, you need to ensure that the timeout will be handled
> correctly both on the
> http response side, and on the cxf message side.
>

I'm feeling silly :-) as I don't get it.

Input :

1. What CXF JettyDestination can do if it finds out that a continuation with say 'resumed' status, which is what I see in case1 as I 
described earlier has no message associated with it ?

2. Why it should care about the suspended/resumed status of the continuation when its *isNew()* call returns *false* ?
If it has a message attached to it then what difference would it make for the resumed invocation whether this invocation has been 
resumed due to that continuation being resumed explicitly or timed out ? I honestly see no difference

Output :

I don't so anything at all on the Jetty Destination output, as far as continuations are concerned. Some code down the line calls 
cont.resume() or this continuation expired, in both cases the request is retried by Jetty (that's the inbound path). If the existing 
message is found on the incoming http request's continuatiuon - we resume a paused invocation otherwise we start a new one. As I 
said - we might throw an exception at this point if we find that the not-new continuation has no message attached to it - logging a 
warning for now.

Either way, eventually this invocation returns. Why should we do anything about the fact at this stage that this invocation was 
'resumed' by the timer expiring ?

Am I totally slow ?

Cheers, Sergey 


Re: Jetty Continuations in CXF

Posted by Guillaume Nodet <gn...@gmail.com>.
On Wed, Nov 12, 2008 at 11:51 AM, Sergey Beryozkin
<se...@progress.com> wrote:
> Hi
>
>
>> You need to associate the continuation with the message before the
>> suspend() method is called, so that whenever the message is ready, it
>> will work.
>
> Yes - agreed. But as I said we have two cases.
>
> Case1.
>
> Jetty -> CXF -> test code/some other code does direct jetty
> continuation.suspend()
>
> In this case it's essentially a user code which does it and a user code has
> no notion of internal CXF Message class. It just invokes on a jetty
> continuation. It's this code which will do suspend/resume and it's in this
> case when there's a race condition between the moment a user (my test) code
> does continuation.suspend() (on one thread) and immediately after that
> continuation.resume() on the other one.
>
> See what I mean ? is it how we can expect the SMX CXF binding component
> interacting with Jetty continuations (apart from it not doing resume()
> immediately I guess) ?
>
> So before a suspended runtime exception reaches the nearest catch block in
> the CXF code where we can get a chance to do something to preserve the state
> of the given invocation, resume() might've alreadty occurred.

That's why you need some synchronization blocks to ensure resume can
not be called before suspend has been handled somehow.
I would suggest wrapping the jetty continuation into something more
CXF specific which could be used for other transports too.


> Case 2.
>
> Jetty -> CXF -> test code/some other code interacts with continuations in a
> transport-neutral way through CXF provided wrappers. Now, in this case what
> happens is that we do preseve the message before doing suspend(), as you
> suggested, so everything goes fine.
>
>> Also, I think you will have to care about timeouts ...
>
> Why ? It's a CXF user code which calls suspend(). In CXF Jetty Destination I
> attempt to get a message from a ContinuationSupport.getContinuation(). If
> the returned continuation is not new and it has no message associated with
> it then there's really nothing CXF can do but to procede with a new
> invocation, irrespectively of wheteher this continuation was resumed or
> timed-out. It may throw an exception in this case but for now I prefer to
> log a warning as the things seems to be working anyway - it will be up to a
> user code to do some more drastic actions.
>
> It's likely I'm missing some subtle or even obvious details but for now
> things seem to be quote clear to me.

Whoever calls the suspend() does not really matter here.
The problem is that suspending the continuation will trigger a timer in jetty.
When this timer elapse, the request will be replayed with a flag on
the continuation
saying it has been timed out somehow (we check the cont.isPending()
flag in smx).
At this point, you need to ensure that the timeout will be handled
correctly both on the
http response side, and on the cxf message side.

>> Another thing: it would be nice if you could create a branch and
>> commit your ongoing work there so that we can have something more
>> tangible to discuss on ... ;-)  We may has well just drop it later, it
>> does not really matter.
>
> sorry - I see it would really help to discuss things better
>
> Cheers, Sergey
>
>>
>> On Wed, Nov 12, 2008 at 11:21 AM, Sergey Beryozkin
>> <se...@progress.com> wrote:
>>>
>>> Hi,
>>>
>>> I have had a look. At the moment I don't see why we would have to do this
>>> sort of sophisticated handling of continuations in CXF JettyDestination.
>>> With CXF, it's the the code being invoked further down the line (be it
>>> SMX
>>> CXF binding components or application code) which needs to worry about
>>> doing
>>> either suspending or resuming continuations.
>>>
>>> As far as CXF is concerned, it only needs to be able to associate a given
>>> inbound message with a continuation instance. I reckon saving it as a
>>> continuation user object (preserving the previously set one if any) is a
>>> lighter/simpler alternative than introducing maps in the
>>> JettyDestination.
>>>
>>> However, as I said few times earlier in this thread, there's a race
>>> condition which I observe in certain conditions. Specifically, I have a
>>> test
>>> where a continuation is resumed virtually immediately after it's been
>>> suspended so before the code dealing with associating this suspended
>>> continuation with the inbound message has a chance to do it, the
>>> continuation.resume() has already occured. In CXF case I believe it can
>>> happen irrespectively of how we write the code dealing with continuations
>>> under the hood. It won't happen if continuation wrappers are used by the
>>> application code.
>>>
>>> Do you have any comments about this race condition ? Or how a code you
>>> linked to can help to avoid it ?
>>>
>>> Cheers, Sergey
>>>
>>>
>>>
>>>
>>>> I would really encourage you to take a look at the smx code for
>>>> handling continuations.
>>>> We've had quite a hard time to handle race conditions, timeouts etc...
>>>> because the continuation has a timeout and when the message is
>>>> received back around the timeout, things can become a bit tricky.
>>>>
>>>>
>>>>
>>>> https://svn.apache.org/repos/asf/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
>>>>
>>>> We use one concurrent hash map to associate a message id to a
>>>> continuation and multiple synchronization blocks on the continuation
>>>> itself.
>>>> Also the above code can be used with standard servlet servers (i.e.
>>>> when the continuation is a blocking continuation) which is imho a good
>>>> thing.
>>>>
>>>> On Tue, Nov 11, 2008 at 6:51 PM, Sergey Beryozkin
>>>> <se...@progress.com> wrote:
>>>>>
>>>>> Hi
>>>>>
>>>>>>>
>>>>>>> I have 10 threads involved, 5 control ones + 5 application ones, I
>>>>>>> see
>>>>>>> a
>>>>>>> loss of message approximately once in 5 cases. The fact that
>>>>>>> cont.resume()
>>>>>>> is done virtually immediately after cont.suspend() can explain it.
>>>>>>
>>>>>> Without seeing your code, I cannot really offer valid suggestions, but
>>>>>> I'll
>>>>>> try....   :-)
>>>>>
>>>>> I guess having it all on a branch would be handy then :-)
>>>>>
>>>>>>
>>>>>> One thought was in the Continuation object, record if "resume()" has
>>>>>> been
>>>>>> called and if it's been callled by the time the stack unwinds back
>>>>>> into
>>>>>> the
>>>>>> Http transport, just re-dispatch immediately.   Either that or have
>>>>>> the
>>>>>> resume block until the http transport sets a "ready to resume" flag
>>>>>> just
>>>>>> before it allows the exception to flow back into jetty.
>>>>>
>>>>> I have 2 tests.
>>>>>
>>>>> In one test an application server code interacts with a wrapper, both
>>>>> when
>>>>> getting a continuation instance and when calling suspend/resume on it
>>>>> (as
>>>>> suggested by yourself earlier in this thread). In this case, under the
>>>>> hood,
>>>>> an inbound message is associated with a continuation instance before
>>>>> suspend() is called on it. Thus even if the resulting exception does
>>>>> not
>>>>> reach Jetty Destination in time before continuation.resume() is called
>>>>> by
>>>>> a
>>>>> control thread, the message is not lost when the HTTP request is
>>>>> resumed
>>>>> as
>>>>> that HTTP request had this continuation instance associated with it at
>>>>> a
>>>>> time ContinuationsSupport.getContinuations(request) was called.
>>>>>
>>>>> In other test which I believe represents an integration scenario with
>>>>> SMX
>>>>> better, an application server code calls Jetty
>>>>> ContinuationsSupport.getContinuations(request) followed by
>>>>> continuation.suspend(). Now, in this case, before a (Jetty
>>>>> RetryRequest)
>>>>> runtime exception reaches a catch block in AbstractInvoker (where I try
>>>>> to
>>>>> associate a message with continuation), one or two control threads
>>>>> manage
>>>>> to
>>>>> squeeze in and call resume() before catch block has even been
>>>>> processed.
>>>>> So
>>>>> by the time the wrapped exception reaches JettyDestination a request
>>>>> with
>>>>> a
>>>>> resumed continuation has already come back...
>>>>>
>>>>> Does this explanation for a second case and the associated race
>>>>> condition
>>>>> sounds reasonable ?
>>>>>
>>>>> Cheers, Sergey
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>> Dan
>>>>>>
>>>>>>>
>>>>>>> Cheers, Sergey
>>>>>>>
>>>>>>> > That said, I'm now trying to inject a message as a custom
>>>>>>> > continuation
>>>>>>> > object (while preserving the original one if any, both ways) as
>>>>>>> > early
>>>>>>> > as
>>>>>>> > possible, in AbstractInvoker, so the time window at which the race
>>>>>>> > condition I talked about earlier can cause the loss of the original
>>>>>>> > message, is extremely small the time it taked for the
>>>>>>> > continuation.suspend() exception to reach a catch block in
>>>>>>> > AbstractInvoker.
>>>>>>> >
>>>>>>> > Cheers, Sergey
>>>>>>> >
>>>>>>> >> Hi,
>>>>>>> >>
>>>>>>> >> I did some system testing with Jetty continuations and it's going
>>>>>>> >> not
>>>>>>> >> too bad. Here's one issue which I've encountered which might or
>>>>>>> >> might
>>>>>>> >> not be a problem in cases where continuations are ustilized
>>>>>>> >> directly
>>>>>>> >> (that is without our wrappers), as in case of say ServiceMix CXF
>>>>>>> >> binding
>>>>>>> >> component.
>>>>>>> >>
>>>>>>> >> The problem is that when continuation.suspend(timeout) has been
>>>>>>> >> called,
>>>>>>> >> a resulting RuntimeException might not reach CXF JettyDestination
>>>>>>> >> (such
>>>>>>> >> that the original message with its phase chain can be preserved
>>>>>>> >> until
>>>>>>> >> the request is resumed) if some other application thread calls
>>>>>>> >> continuation.resume() or continuation suspend timeout expires.
>>>>>>> >>
>>>>>>> >> In case of ServiceMix the latter is a theoretical possibility at
>>>>>>> >> the
>>>>>>> >> least. I can see in its code this timeout is configured, but if
>>>>>>> >> this
>>>>>>> >> timeout is in the region of up to 1 sec or so then it's feasible
>>>>>>> >> that
>>>>>>> >> with a heavy  workload the race condition described above might
>>>>>>> >> come
>>>>>>> >> to
>>>>>>> >> life.
>>>>>>> >>
>>>>>>> >> That said, as part of my test, I found that even when such
>>>>>>> >> condition
>>>>>>> >> occurs, the 'worst' thing which can happen is that a new message
>>>>>>> >> and
>>>>>>> >> a
>>>>>>> >> new chain are created, that is, the request is not resumed from a
>>>>>>> >> 'suspended' ServiceInvokerInterceptor, but starts as if it was a
>>>>>>> >> new
>>>>>>> >> request alltogether, but it all works nonetheless, as all the
>>>>>>> >> stack
>>>>>>> >> variables used in various interceptors in my given test at least
>>>>>>> >> are
>>>>>>> >> all
>>>>>>> >> obtained from a message. The only downside is that that the work
>>>>>>> >> which
>>>>>>> >> has already been done earlier as part of handling the suspended
>>>>>>> >> request
>>>>>>> >> is repeated again by the interceptors. It can cause issues though
>>>>>>> >> in
>>>>>>> >> cases when some interceptors have sideeffects as part of handling
>>>>>>> >> a
>>>>>>> >> given input request, say modify a db, etc
>>>>>>> >>
>>>>>>> >> Now, this race condition can be safely avoided if a wrapper
>>>>>>> >> proposed
>>>>>>> >> by
>>>>>>> >> Dan is used by a server application code as the message can be
>>>>>>> >> preserved
>>>>>>> >> immediately at a point a user calls suspend on our wrapper, so
>>>>>>> >> without
>>>>>>> >> further doubts I've prototyped it too. It's not possible for SMX
>>>>>>> >> components though
>>>>>>> >>
>>>>>>> >> Comments ?
>>>>>>> >>
>>>>>>> >> Cheers, Sergey
>>>>>>> >>
>>>>>>> >>> I guess my thinking was to tie the continutations directly to the
>>>>>>> >>> PhaseInterceptorChain (since that is going to need to know about
>>>>>>> >>> them
>>>>>>> >>> anyway).   However, I suppose it could easily be done with a new
>>>>>>> >>> interface. Probably the best thing to do is to stub out a sample
>>>>>>> >>> usecase.   So here goes.....
>>>>>>> >>>
>>>>>>> >>> Lets take a "GreetMe" web service that in the greetMe method will
>>>>>>> >>> call
>>>>>>> >>> off asynchrously to some JMS service to actually get the result.
>>>>>>> >>>
>>>>>>> >>> @Resource(name = "jmsClient")
>>>>>>> >>> Greeter jmsGreeter
>>>>>>> >>> @Resource
>>>>>>> >>> WebServiceContext context;
>>>>>>> >>> public String greetMe(String arg) {
>>>>>>> >>>     ContinuationSupport contSupport = (ContinuationSupport)
>>>>>>> >>>              context.get(ContinuationSupport.class.getName());
>>>>>>> >>>     if (contSupport == null) {
>>>>>>> >>>          //continuations not supported, must wait
>>>>>>> >>>          return jmsGreeter.greetMe(arg);
>>>>>>> >>>     }
>>>>>>> >>>     Continuation cont = contSupport.getContinuation();
>>>>>>> >>>     if (cont.isResumed()) {
>>>>>>> >>> AsyncHandler<GreetMeResponse> handler = cont.getObject();
>>>>>>> >>>        return handler.get().getReturn();
>>>>>>> >>>     } else {
>>>>>>> >>>         AsyncHandler<GreetMeResponse> handler = new
>>>>>>> >>> Handler(cont);
>>>>>>> >>>         jmsGreeter.greetMeAsync(arg, handler);
>>>>>>> >>>         cont.suspend(handler);
>>>>>>> >>> return null;   //won't actually get here as suspend will throw a
>>>>>>> >>> ContinuationException
>>>>>>> >>>     }
>>>>>>> >>> }
>>>>>>> >>>
>>>>>>> >>> The Handler would look something like:
>>>>>>> >>> class Handler implements AsyncHandler<GreetMeResponse> {
>>>>>>> >>> GreetMeResponse resp;
>>>>>>> >>>        Continuation cont;
>>>>>>> >>> public Handler(Continuation cont) {
>>>>>>> >>>            this.cont = cont;
>>>>>>> >>>        }
>>>>>>> >>>        public void handleResponse(Response<GreetMeLaterResponse>
>>>>>>> >>> response) { resp = response.get();
>>>>>>> >>>              cont.resume();
>>>>>>> >>>       }
>>>>>>> >>> }
>>>>>>> >>>
>>>>>>> >>> Basically, the HTTP/Jetty transport could provide an
>>>>>>> >>> implementation
>>>>>>> >>> of
>>>>>>> >>> ContinuationSupport that wrappers the jetty stuff.    JMS could
>>>>>>> >>> provide
>>>>>>> >>> one that's pretty much a null op.   Transports that cannot
>>>>>>> >>> support
>>>>>>> >>> it
>>>>>>> >>> (like servlet) just wouldn't provide an implementation.
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> Does that make sense?   Other ideas?
>>>>>>> >>>
>>>>>>> >>> Dan
>>>>>>> >>>
>>>>>>> >>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
>>>>>>> >>>> > No.   We don't want that.   Whatever we do should work for
>>>>>>> >>>> > other
>>>>>>> >>>> > transports as well like JMS.  Thus, this shouldn't be tied to
>>>>>>> >>>> > jetty
>>>>>>> >>>> > continuations directly.
>>>>>>> >>>>
>>>>>>> >>>> No, I'm not suggesting to tie it up to jetty continuations.
>>>>>>> >>>> Ex.
>>>>>>> >>>>
>>>>>>> >>>> try {
>>>>>>> >>>>   invoke(); // continuation.suspend() somehow by the code being
>>>>>>> >>>> invoked upon }
>>>>>>> >>>> catch (RuntimeException ex) {
>>>>>>> >>>>
>>>>>>> >>>> if
>>>>>>> >>>>
>>>>>>> >>>>
>>>>>>> >>>> (ex.getClass().getName().equals("jetty.JettyContinuationException"))
>>>>>>> >>>> throw new SuspendedFault(ex);
>>>>>>> >>>>     // or PhaseInterceptorChain.suspend()
>>>>>>> >>>> }
>>>>>>> >>>> }
>>>>>>> >>>>
>>>>>>> >>>> > Most likely, we could add a "suspend()" method to
>>>>>>> >>>> > PhaseInterceptorChain that would do something very similar and
>>>>>>> >>>> > throw
>>>>>>> >>>> > a "SuspendException" or something in the same package as
>>>>>>> >>>> > PhaseInterceptorChain.
>>>>>>> >>>>
>>>>>>> >>>> When do we trigger this PhaseInterceptorChain.suspend() call
>>>>>>> >>>> though
>>>>>>> >>>> ?
>>>>>>> >>>>
>>>>>>> >>>> >   That would get propogated
>>>>>>> >>>> > back to the JettyDestination that could then call the jetty
>>>>>>> >>>> > things.
>>>>>>> >>>> >  The JMS transport could just catch it and more or less ignore
>>>>>>> >>>> > it.
>>>>>>> >>>> >  We'd then have to add a "resume()" method to the chain which
>>>>>>> >>>> > would
>>>>>>> >>>> > call back onto a listener that the transport provides.   Jetty
>>>>>>> >>>> > would
>>>>>>> >>>> > just call the jetty resume stuff. JMS would probably put a
>>>>>>> >>>> > runnable
>>>>>>> >>>> > on the workqueue to restart the chain.
>>>>>>> >>>>
>>>>>>> >>>> ok
>>>>>>> >>>>
>>>>>>> >>>> > Also, suspend() would need to check if there is a listener.
>>>>>>> >>>> >  If
>>>>>>> >>>> > not,
>>>>>>> >>>> > it should not throw the exception.   Thus, the servlet
>>>>>>> >>>> > transport
>>>>>>> >>>> > and
>>>>>>> >>>> > CORBA stuff that couldn't do this would pretty much just
>>>>>>> >>>> > ignore
>>>>>>> >>>> > it.
>>>>>>> >>>>
>>>>>>> >>>> ok, not sure I understand about the listener but I think I see
>>>>>>> >>>> what
>>>>>>> >>>> you mean...
>>>>>>> >>>>
>>>>>>> >>>> > Basically, this needs to be done in such a way that it CAN
>>>>>>> >>>> > work
>>>>>>> >>>> > for
>>>>>>> >>>> > the non-jetty cases.   However, it also needs to be done in a
>>>>>>> >>>> > way
>>>>>>> >>>> > that doesn't affect existing transports.
>>>>>>> >>>>
>>>>>>> >>>> +1
>>>>>>> >>>>
>>>>>>> >>>> Cheers, Sergey
>>>>>>> >>>>
>>>>>>> >>>> > Dan
>>>>>>> >>>> >
>>>>>>> >>>> >> 2. Now, if the above can be figured out, the next problem
>>>>>>> >>>> >> arises:
>>>>>>> >>>> >> when the "trigger" to wake up the continuation occurs
>>>>>>> >>>> >>
>>>>>>> >>>> >> I think we can can do in JettyDestination omething similar to
>>>>>>> >>>> >> what
>>>>>>> >>>> >> is done in SMX. When getting a SuspendedFault exception, we
>>>>>>> >>>> >> can
>>>>>>> >>>> >> extract from it the original continuation instance or else we
>>>>>>> >>>> >> can
>>>>>>> >>>> >> do ContinuationSupport.getContinuation(request) which should
>>>>>>> >>>> >> return
>>>>>>> >>>> >> us the instance. At this point we can use it as a ket to
>>>>>>> >>>> >> store
>>>>>>> >>>> >> the
>>>>>>> >>>> >> current exchange plus all the other info we may need.
>>>>>>> >>>> >>
>>>>>>> >>>> >> When the user/application code does continuation.resume(),
>>>>>>> >>>> >> the
>>>>>>> >>>> >> Jetty thread will come back and we will use the
>>>>>>> >>>> >> ContinuationSupport.getContinuation(request) to get us the
>>>>>>> >>>> >> active
>>>>>>> >>>> >> continuation and use it to extract the suspended exchange and
>>>>>>> >>>> >> proceed from there, say we'll call
>>>>>>> >>>> >> PhaseInterceptorPhase.resume(),
>>>>>>> >>>> >> etc, something along the lines you suggested
>>>>>>> >>>> >>
>>>>>>> >>>> >>
>>>>>>> >>>> >> 3. Basically, to do this "right", we'd need to audit pretty
>>>>>>> >>>> >> much
>>>>>>> >>>> >> everything to make sure nothing is stored on the stack and is
>>>>>>> >>>> >> "resumable". Once that is done, the rest is relatively easy.
>>>>>>> >>>> >>
>>>>>>> >>>> >> Yea - probably can be the quite challenging
>>>>>>> >>>> >>
>>>>>>> >>>> >>
>>>>>>> >>>> >> Thoughts ?
>>>>>>> >>>> >>
>>>>>>> >>>> >> Cheers, Sergey
>>>>>>> >>>> >>
>>>>>>> >>>> >>
>>>>>>> >>>> >>
>>>>>>> >>>> >>
>>>>>>> >>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
>>>>>>> >>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
>>>>>>> >>>> >> [3]
>>>>>>> >>>> >>
>>>>>>> >>>> >>
>>>>>>> >>>> >>
>>>>>>> >>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=126
>>>>>>> >>>> >>42361 #ac tion_12642361
>>>>>>> >>>> >
>>>>>>> >>>> > --
>>>>>>> >>>> > Daniel Kulp
>>>>>>> >>>> > dkulp@apache.org
>>>>>>> >>>> > http://dankulp.com/blog
>>>>>>> >>>
>>>>>>> >>> --
>>>>>>> >>> Daniel Kulp
>>>>>>> >>> dkulp@apache.org
>>>>>>> >>> http://dankulp.com/blog
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Daniel Kulp
>>>>>> dkulp@apache.org
>>>>>> http://dankulp.com/blog
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Cheers,
>>>> Guillaume Nodet
>>>> ------------------------
>>>> Blog: http://gnodet.blogspot.com/
>>>> ------------------------
>>>> Open Source SOA
>>>> http://fusesource.com
>>>
>>>
>>
>>
>>
>> --
>> Cheers,
>> Guillaume Nodet
>> ------------------------
>> Blog: http://gnodet.blogspot.com/
>> ------------------------
>> Open Source SOA
>> http://fusesource.com
>
>



-- 
Cheers,
Guillaume Nodet
------------------------
Blog: http://gnodet.blogspot.com/
------------------------
Open Source SOA
http://fusesource.com

Re: Jetty Continuations in CXF

Posted by Sergey Beryozkin <se...@progress.com>.
Hi


> You need to associate the continuation with the message before the
> suspend() method is called, so that whenever the message is ready, it
> will work.

Yes - agreed. But as I said we have two cases.

Case1.

Jetty -> CXF -> test code/some other code does direct jetty continuation.suspend()

In this case it's essentially a user code which does it and a user code has no notion of internal CXF Message class. It just invokes 
on a jetty continuation. It's this code which will do suspend/resume and it's in this case when there's a race condition between the 
moment a user (my test) code does continuation.suspend() (on one thread) and immediately after that continuation.resume() on the 
other one.

See what I mean ? is it how we can expect the SMX CXF binding component interacting with Jetty continuations (apart from it not 
doing resume() immediately I guess) ?

So before a suspended runtime exception reaches the nearest catch block in the CXF code where we can get a chance to do something to 
preserve the state of the given invocation, resume() might've alreadty occurred.
Case 2.

Jetty -> CXF -> test code/some other code interacts with continuations in a transport-neutral way through CXF provided wrappers. 
Now, in this case what happens is that we do preseve the message before doing suspend(), as you suggested, so everything goes fine.

> Also, I think you will have to care about timeouts ...

Why ? It's a CXF user code which calls suspend(). In CXF Jetty Destination I attempt to get a message from a 
ContinuationSupport.getContinuation(). If the returned continuation is not new and it has no message associated with it then there's 
really nothing CXF can do but to procede with a new invocation, irrespectively of wheteher this continuation was resumed or 
timed-out. It may throw an exception in this case but for now I prefer to log a warning as the things seems to be working anyway - 
it will be up to a user code to do some more drastic actions.

It's likely I'm missing some subtle or even obvious details but for now things seem to be quote clear to me.

> Another thing: it would be nice if you could create a branch and
> commit your ongoing work there so that we can have something more
> tangible to discuss on ... ;-)  We may has well just drop it later, it
> does not really matter.

sorry - I see it would really help to discuss things better

Cheers, Sergey

>
> On Wed, Nov 12, 2008 at 11:21 AM, Sergey Beryozkin
> <se...@progress.com> wrote:
>> Hi,
>>
>> I have had a look. At the moment I don't see why we would have to do this
>> sort of sophisticated handling of continuations in CXF JettyDestination.
>> With CXF, it's the the code being invoked further down the line (be it SMX
>> CXF binding components or application code) which needs to worry about doing
>> either suspending or resuming continuations.
>>
>> As far as CXF is concerned, it only needs to be able to associate a given
>> inbound message with a continuation instance. I reckon saving it as a
>> continuation user object (preserving the previously set one if any) is a
>> lighter/simpler alternative than introducing maps in the JettyDestination.
>>
>> However, as I said few times earlier in this thread, there's a race
>> condition which I observe in certain conditions. Specifically, I have a test
>> where a continuation is resumed virtually immediately after it's been
>> suspended so before the code dealing with associating this suspended
>> continuation with the inbound message has a chance to do it, the
>> continuation.resume() has already occured. In CXF case I believe it can
>> happen irrespectively of how we write the code dealing with continuations
>> under the hood. It won't happen if continuation wrappers are used by the
>> application code.
>>
>> Do you have any comments about this race condition ? Or how a code you
>> linked to can help to avoid it ?
>>
>> Cheers, Sergey
>>
>>
>>
>>
>>> I would really encourage you to take a look at the smx code for
>>> handling continuations.
>>> We've had quite a hard time to handle race conditions, timeouts etc...
>>> because the continuation has a timeout and when the message is
>>> received back around the timeout, things can become a bit tricky.
>>>
>>>
>>> https://svn.apache.org/repos/asf/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
>>>
>>> We use one concurrent hash map to associate a message id to a
>>> continuation and multiple synchronization blocks on the continuation
>>> itself.
>>> Also the above code can be used with standard servlet servers (i.e.
>>> when the continuation is a blocking continuation) which is imho a good
>>> thing.
>>>
>>> On Tue, Nov 11, 2008 at 6:51 PM, Sergey Beryozkin
>>> <se...@progress.com> wrote:
>>>>
>>>> Hi
>>>>
>>>>>>
>>>>>> I have 10 threads involved, 5 control ones + 5 application ones, I see
>>>>>> a
>>>>>> loss of message approximately once in 5 cases. The fact that
>>>>>> cont.resume()
>>>>>> is done virtually immediately after cont.suspend() can explain it.
>>>>>
>>>>> Without seeing your code, I cannot really offer valid suggestions, but
>>>>> I'll
>>>>> try....   :-)
>>>>
>>>> I guess having it all on a branch would be handy then :-)
>>>>
>>>>>
>>>>> One thought was in the Continuation object, record if "resume()" has
>>>>> been
>>>>> called and if it's been callled by the time the stack unwinds back into
>>>>> the
>>>>> Http transport, just re-dispatch immediately.   Either that or have the
>>>>> resume block until the http transport sets a "ready to resume" flag just
>>>>> before it allows the exception to flow back into jetty.
>>>>
>>>> I have 2 tests.
>>>>
>>>> In one test an application server code interacts with a wrapper, both
>>>> when
>>>> getting a continuation instance and when calling suspend/resume on it (as
>>>> suggested by yourself earlier in this thread). In this case, under the
>>>> hood,
>>>> an inbound message is associated with a continuation instance before
>>>> suspend() is called on it. Thus even if the resulting exception does not
>>>> reach Jetty Destination in time before continuation.resume() is called by
>>>> a
>>>> control thread, the message is not lost when the HTTP request is resumed
>>>> as
>>>> that HTTP request had this continuation instance associated with it at a
>>>> time ContinuationsSupport.getContinuations(request) was called.
>>>>
>>>> In other test which I believe represents an integration scenario with SMX
>>>> better, an application server code calls Jetty
>>>> ContinuationsSupport.getContinuations(request) followed by
>>>> continuation.suspend(). Now, in this case, before a (Jetty RetryRequest)
>>>> runtime exception reaches a catch block in AbstractInvoker (where I try
>>>> to
>>>> associate a message with continuation), one or two control threads manage
>>>> to
>>>> squeeze in and call resume() before catch block has even been processed.
>>>> So
>>>> by the time the wrapped exception reaches JettyDestination a request with
>>>> a
>>>> resumed continuation has already come back...
>>>>
>>>> Does this explanation for a second case and the associated race condition
>>>> sounds reasonable ?
>>>>
>>>> Cheers, Sergey
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>>
>>>>>
>>>>> Dan
>>>>>
>>>>>>
>>>>>> Cheers, Sergey
>>>>>>
>>>>>> > That said, I'm now trying to inject a message as a custom
>>>>>> > continuation
>>>>>> > object (while preserving the original one if any, both ways) as early
>>>>>> > as
>>>>>> > possible, in AbstractInvoker, so the time window at which the race
>>>>>> > condition I talked about earlier can cause the loss of the original
>>>>>> > message, is extremely small the time it taked for the
>>>>>> > continuation.suspend() exception to reach a catch block in
>>>>>> > AbstractInvoker.
>>>>>> >
>>>>>> > Cheers, Sergey
>>>>>> >
>>>>>> >> Hi,
>>>>>> >>
>>>>>> >> I did some system testing with Jetty continuations and it's going
>>>>>> >> not
>>>>>> >> too bad. Here's one issue which I've encountered which might or
>>>>>> >> might
>>>>>> >> not be a problem in cases where continuations are ustilized directly
>>>>>> >> (that is without our wrappers), as in case of say ServiceMix CXF
>>>>>> >> binding
>>>>>> >> component.
>>>>>> >>
>>>>>> >> The problem is that when continuation.suspend(timeout) has been
>>>>>> >> called,
>>>>>> >> a resulting RuntimeException might not reach CXF JettyDestination
>>>>>> >> (such
>>>>>> >> that the original message with its phase chain can be preserved
>>>>>> >> until
>>>>>> >> the request is resumed) if some other application thread calls
>>>>>> >> continuation.resume() or continuation suspend timeout expires.
>>>>>> >>
>>>>>> >> In case of ServiceMix the latter is a theoretical possibility at the
>>>>>> >> least. I can see in its code this timeout is configured, but if this
>>>>>> >> timeout is in the region of up to 1 sec or so then it's feasible
>>>>>> >> that
>>>>>> >> with a heavy  workload the race condition described above might come
>>>>>> >> to
>>>>>> >> life.
>>>>>> >>
>>>>>> >> That said, as part of my test, I found that even when such condition
>>>>>> >> occurs, the 'worst' thing which can happen is that a new message and
>>>>>> >> a
>>>>>> >> new chain are created, that is, the request is not resumed from a
>>>>>> >> 'suspended' ServiceInvokerInterceptor, but starts as if it was a new
>>>>>> >> request alltogether, but it all works nonetheless, as all the stack
>>>>>> >> variables used in various interceptors in my given test at least are
>>>>>> >> all
>>>>>> >> obtained from a message. The only downside is that that the work
>>>>>> >> which
>>>>>> >> has already been done earlier as part of handling the suspended
>>>>>> >> request
>>>>>> >> is repeated again by the interceptors. It can cause issues though in
>>>>>> >> cases when some interceptors have sideeffects as part of handling a
>>>>>> >> given input request, say modify a db, etc
>>>>>> >>
>>>>>> >> Now, this race condition can be safely avoided if a wrapper proposed
>>>>>> >> by
>>>>>> >> Dan is used by a server application code as the message can be
>>>>>> >> preserved
>>>>>> >> immediately at a point a user calls suspend on our wrapper, so
>>>>>> >> without
>>>>>> >> further doubts I've prototyped it too. It's not possible for SMX
>>>>>> >> components though
>>>>>> >>
>>>>>> >> Comments ?
>>>>>> >>
>>>>>> >> Cheers, Sergey
>>>>>> >>
>>>>>> >>> I guess my thinking was to tie the continutations directly to the
>>>>>> >>> PhaseInterceptorChain (since that is going to need to know about
>>>>>> >>> them
>>>>>> >>> anyway).   However, I suppose it could easily be done with a new
>>>>>> >>> interface. Probably the best thing to do is to stub out a sample
>>>>>> >>> usecase.   So here goes.....
>>>>>> >>>
>>>>>> >>> Lets take a "GreetMe" web service that in the greetMe method will
>>>>>> >>> call
>>>>>> >>> off asynchrously to some JMS service to actually get the result.
>>>>>> >>>
>>>>>> >>> @Resource(name = "jmsClient")
>>>>>> >>> Greeter jmsGreeter
>>>>>> >>> @Resource
>>>>>> >>> WebServiceContext context;
>>>>>> >>> public String greetMe(String arg) {
>>>>>> >>>     ContinuationSupport contSupport = (ContinuationSupport)
>>>>>> >>>              context.get(ContinuationSupport.class.getName());
>>>>>> >>>     if (contSupport == null) {
>>>>>> >>>          //continuations not supported, must wait
>>>>>> >>>          return jmsGreeter.greetMe(arg);
>>>>>> >>>     }
>>>>>> >>>     Continuation cont = contSupport.getContinuation();
>>>>>> >>>     if (cont.isResumed()) {
>>>>>> >>> AsyncHandler<GreetMeResponse> handler = cont.getObject();
>>>>>> >>>        return handler.get().getReturn();
>>>>>> >>>     } else {
>>>>>> >>>         AsyncHandler<GreetMeResponse> handler = new Handler(cont);
>>>>>> >>>         jmsGreeter.greetMeAsync(arg, handler);
>>>>>> >>>         cont.suspend(handler);
>>>>>> >>> return null;   //won't actually get here as suspend will throw a
>>>>>> >>> ContinuationException
>>>>>> >>>     }
>>>>>> >>> }
>>>>>> >>>
>>>>>> >>> The Handler would look something like:
>>>>>> >>> class Handler implements AsyncHandler<GreetMeResponse> {
>>>>>> >>> GreetMeResponse resp;
>>>>>> >>>        Continuation cont;
>>>>>> >>> public Handler(Continuation cont) {
>>>>>> >>>            this.cont = cont;
>>>>>> >>>        }
>>>>>> >>>        public void handleResponse(Response<GreetMeLaterResponse>
>>>>>> >>> response) { resp = response.get();
>>>>>> >>>              cont.resume();
>>>>>> >>>       }
>>>>>> >>> }
>>>>>> >>>
>>>>>> >>> Basically, the HTTP/Jetty transport could provide an implementation
>>>>>> >>> of
>>>>>> >>> ContinuationSupport that wrappers the jetty stuff.    JMS could
>>>>>> >>> provide
>>>>>> >>> one that's pretty much a null op.   Transports that cannot support
>>>>>> >>> it
>>>>>> >>> (like servlet) just wouldn't provide an implementation.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Does that make sense?   Other ideas?
>>>>>> >>>
>>>>>> >>> Dan
>>>>>> >>>
>>>>>> >>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
>>>>>> >>>> > No.   We don't want that.   Whatever we do should work for other
>>>>>> >>>> > transports as well like JMS.  Thus, this shouldn't be tied to
>>>>>> >>>> > jetty
>>>>>> >>>> > continuations directly.
>>>>>> >>>>
>>>>>> >>>> No, I'm not suggesting to tie it up to jetty continuations.
>>>>>> >>>> Ex.
>>>>>> >>>>
>>>>>> >>>> try {
>>>>>> >>>>   invoke(); // continuation.suspend() somehow by the code being
>>>>>> >>>> invoked upon }
>>>>>> >>>> catch (RuntimeException ex) {
>>>>>> >>>>
>>>>>> >>>> if
>>>>>> >>>>
>>>>>> >>>> (ex.getClass().getName().equals("jetty.JettyContinuationException"))
>>>>>> >>>> throw new SuspendedFault(ex);
>>>>>> >>>>     // or PhaseInterceptorChain.suspend()
>>>>>> >>>> }
>>>>>> >>>> }
>>>>>> >>>>
>>>>>> >>>> > Most likely, we could add a "suspend()" method to
>>>>>> >>>> > PhaseInterceptorChain that would do something very similar and
>>>>>> >>>> > throw
>>>>>> >>>> > a "SuspendException" or something in the same package as
>>>>>> >>>> > PhaseInterceptorChain.
>>>>>> >>>>
>>>>>> >>>> When do we trigger this PhaseInterceptorChain.suspend() call
>>>>>> >>>> though
>>>>>> >>>> ?
>>>>>> >>>>
>>>>>> >>>> >   That would get propogated
>>>>>> >>>> > back to the JettyDestination that could then call the jetty
>>>>>> >>>> > things.
>>>>>> >>>> >  The JMS transport could just catch it and more or less ignore
>>>>>> >>>> > it.
>>>>>> >>>> >  We'd then have to add a "resume()" method to the chain which
>>>>>> >>>> > would
>>>>>> >>>> > call back onto a listener that the transport provides.   Jetty
>>>>>> >>>> > would
>>>>>> >>>> > just call the jetty resume stuff. JMS would probably put a
>>>>>> >>>> > runnable
>>>>>> >>>> > on the workqueue to restart the chain.
>>>>>> >>>>
>>>>>> >>>> ok
>>>>>> >>>>
>>>>>> >>>> > Also, suspend() would need to check if there is a listener.  If
>>>>>> >>>> > not,
>>>>>> >>>> > it should not throw the exception.   Thus, the servlet transport
>>>>>> >>>> > and
>>>>>> >>>> > CORBA stuff that couldn't do this would pretty much just ignore
>>>>>> >>>> > it.
>>>>>> >>>>
>>>>>> >>>> ok, not sure I understand about the listener but I think I see
>>>>>> >>>> what
>>>>>> >>>> you mean...
>>>>>> >>>>
>>>>>> >>>> > Basically, this needs to be done in such a way that it CAN work
>>>>>> >>>> > for
>>>>>> >>>> > the non-jetty cases.   However, it also needs to be done in a
>>>>>> >>>> > way
>>>>>> >>>> > that doesn't affect existing transports.
>>>>>> >>>>
>>>>>> >>>> +1
>>>>>> >>>>
>>>>>> >>>> Cheers, Sergey
>>>>>> >>>>
>>>>>> >>>> > Dan
>>>>>> >>>> >
>>>>>> >>>> >> 2. Now, if the above can be figured out, the next problem
>>>>>> >>>> >> arises:
>>>>>> >>>> >> when the "trigger" to wake up the continuation occurs
>>>>>> >>>> >>
>>>>>> >>>> >> I think we can can do in JettyDestination omething similar to
>>>>>> >>>> >> what
>>>>>> >>>> >> is done in SMX. When getting a SuspendedFault exception, we can
>>>>>> >>>> >> extract from it the original continuation instance or else we
>>>>>> >>>> >> can
>>>>>> >>>> >> do ContinuationSupport.getContinuation(request) which should
>>>>>> >>>> >> return
>>>>>> >>>> >> us the instance. At this point we can use it as a ket to store
>>>>>> >>>> >> the
>>>>>> >>>> >> current exchange plus all the other info we may need.
>>>>>> >>>> >>
>>>>>> >>>> >> When the user/application code does continuation.resume(), the
>>>>>> >>>> >> Jetty thread will come back and we will use the
>>>>>> >>>> >> ContinuationSupport.getContinuation(request) to get us the
>>>>>> >>>> >> active
>>>>>> >>>> >> continuation and use it to extract the suspended exchange and
>>>>>> >>>> >> proceed from there, say we'll call
>>>>>> >>>> >> PhaseInterceptorPhase.resume(),
>>>>>> >>>> >> etc, something along the lines you suggested
>>>>>> >>>> >>
>>>>>> >>>> >>
>>>>>> >>>> >> 3. Basically, to do this "right", we'd need to audit pretty
>>>>>> >>>> >> much
>>>>>> >>>> >> everything to make sure nothing is stored on the stack and is
>>>>>> >>>> >> "resumable". Once that is done, the rest is relatively easy.
>>>>>> >>>> >>
>>>>>> >>>> >> Yea - probably can be the quite challenging
>>>>>> >>>> >>
>>>>>> >>>> >>
>>>>>> >>>> >> Thoughts ?
>>>>>> >>>> >>
>>>>>> >>>> >> Cheers, Sergey
>>>>>> >>>> >>
>>>>>> >>>> >>
>>>>>> >>>> >>
>>>>>> >>>> >>
>>>>>> >>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
>>>>>> >>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
>>>>>> >>>> >> [3]
>>>>>> >>>> >>
>>>>>> >>>> >>
>>>>>> >>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=126
>>>>>> >>>> >>42361 #ac tion_12642361
>>>>>> >>>> >
>>>>>> >>>> > --
>>>>>> >>>> > Daniel Kulp
>>>>>> >>>> > dkulp@apache.org
>>>>>> >>>> > http://dankulp.com/blog
>>>>>> >>>
>>>>>> >>> --
>>>>>> >>> Daniel Kulp
>>>>>> >>> dkulp@apache.org
>>>>>> >>> http://dankulp.com/blog
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Daniel Kulp
>>>>> dkulp@apache.org
>>>>> http://dankulp.com/blog
>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Cheers,
>>> Guillaume Nodet
>>> ------------------------
>>> Blog: http://gnodet.blogspot.com/
>>> ------------------------
>>> Open Source SOA
>>> http://fusesource.com
>>
>>
>
>
>
> -- 
> Cheers,
> Guillaume Nodet
> ------------------------
> Blog: http://gnodet.blogspot.com/
> ------------------------
> Open Source SOA
> http://fusesource.com 


Re: Jetty Continuations in CXF

Posted by Guillaume Nodet <gn...@gmail.com>.
You need to associate the continuation with the message before the
suspend() method is called, so that whenever the message is ready, it
will work.
Also, I think you will have to care about timeouts ...
Another thing: it would be nice if you could create a branch and
commit your ongoing work there so that we can have something more
tangible to discuss on ... ;-)  We may has well just drop it later, it
does not really matter.

On Wed, Nov 12, 2008 at 11:21 AM, Sergey Beryozkin
<se...@progress.com> wrote:
> Hi,
>
> I have had a look. At the moment I don't see why we would have to do this
> sort of sophisticated handling of continuations in CXF JettyDestination.
> With CXF, it's the the code being invoked further down the line (be it SMX
> CXF binding components or application code) which needs to worry about doing
> either suspending or resuming continuations.
>
> As far as CXF is concerned, it only needs to be able to associate a given
> inbound message with a continuation instance. I reckon saving it as a
> continuation user object (preserving the previously set one if any) is a
> lighter/simpler alternative than introducing maps in the JettyDestination.
>
> However, as I said few times earlier in this thread, there's a race
> condition which I observe in certain conditions. Specifically, I have a test
> where a continuation is resumed virtually immediately after it's been
> suspended so before the code dealing with associating this suspended
> continuation with the inbound message has a chance to do it, the
> continuation.resume() has already occured. In CXF case I believe it can
> happen irrespectively of how we write the code dealing with continuations
> under the hood. It won't happen if continuation wrappers are used by the
> application code.
>
> Do you have any comments about this race condition ? Or how a code you
> linked to can help to avoid it ?
>
> Cheers, Sergey
>
>
>
>
>> I would really encourage you to take a look at the smx code for
>> handling continuations.
>> We've had quite a hard time to handle race conditions, timeouts etc...
>> because the continuation has a timeout and when the message is
>> received back around the timeout, things can become a bit tricky.
>>
>>
>> https://svn.apache.org/repos/asf/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
>>
>> We use one concurrent hash map to associate a message id to a
>> continuation and multiple synchronization blocks on the continuation
>> itself.
>> Also the above code can be used with standard servlet servers (i.e.
>> when the continuation is a blocking continuation) which is imho a good
>> thing.
>>
>> On Tue, Nov 11, 2008 at 6:51 PM, Sergey Beryozkin
>> <se...@progress.com> wrote:
>>>
>>> Hi
>>>
>>>>>
>>>>> I have 10 threads involved, 5 control ones + 5 application ones, I see
>>>>> a
>>>>> loss of message approximately once in 5 cases. The fact that
>>>>> cont.resume()
>>>>> is done virtually immediately after cont.suspend() can explain it.
>>>>
>>>> Without seeing your code, I cannot really offer valid suggestions, but
>>>> I'll
>>>> try....   :-)
>>>
>>> I guess having it all on a branch would be handy then :-)
>>>
>>>>
>>>> One thought was in the Continuation object, record if "resume()" has
>>>> been
>>>> called and if it's been callled by the time the stack unwinds back into
>>>> the
>>>> Http transport, just re-dispatch immediately.   Either that or have the
>>>> resume block until the http transport sets a "ready to resume" flag just
>>>> before it allows the exception to flow back into jetty.
>>>
>>> I have 2 tests.
>>>
>>> In one test an application server code interacts with a wrapper, both
>>> when
>>> getting a continuation instance and when calling suspend/resume on it (as
>>> suggested by yourself earlier in this thread). In this case, under the
>>> hood,
>>> an inbound message is associated with a continuation instance before
>>> suspend() is called on it. Thus even if the resulting exception does not
>>> reach Jetty Destination in time before continuation.resume() is called by
>>> a
>>> control thread, the message is not lost when the HTTP request is resumed
>>> as
>>> that HTTP request had this continuation instance associated with it at a
>>> time ContinuationsSupport.getContinuations(request) was called.
>>>
>>> In other test which I believe represents an integration scenario with SMX
>>> better, an application server code calls Jetty
>>> ContinuationsSupport.getContinuations(request) followed by
>>> continuation.suspend(). Now, in this case, before a (Jetty RetryRequest)
>>> runtime exception reaches a catch block in AbstractInvoker (where I try
>>> to
>>> associate a message with continuation), one or two control threads manage
>>> to
>>> squeeze in and call resume() before catch block has even been processed.
>>> So
>>> by the time the wrapped exception reaches JettyDestination a request with
>>> a
>>> resumed continuation has already come back...
>>>
>>> Does this explanation for a second case and the associated race condition
>>> sounds reasonable ?
>>>
>>> Cheers, Sergey
>>>
>>>
>>>
>>>
>>>
>>>>
>>>>
>>>> Dan
>>>>
>>>>>
>>>>> Cheers, Sergey
>>>>>
>>>>> > That said, I'm now trying to inject a message as a custom
>>>>> > continuation
>>>>> > object (while preserving the original one if any, both ways) as early
>>>>> > as
>>>>> > possible, in AbstractInvoker, so the time window at which the race
>>>>> > condition I talked about earlier can cause the loss of the original
>>>>> > message, is extremely small the time it taked for the
>>>>> > continuation.suspend() exception to reach a catch block in
>>>>> > AbstractInvoker.
>>>>> >
>>>>> > Cheers, Sergey
>>>>> >
>>>>> >> Hi,
>>>>> >>
>>>>> >> I did some system testing with Jetty continuations and it's going
>>>>> >> not
>>>>> >> too bad. Here's one issue which I've encountered which might or
>>>>> >> might
>>>>> >> not be a problem in cases where continuations are ustilized directly
>>>>> >> (that is without our wrappers), as in case of say ServiceMix CXF
>>>>> >> binding
>>>>> >> component.
>>>>> >>
>>>>> >> The problem is that when continuation.suspend(timeout) has been
>>>>> >> called,
>>>>> >> a resulting RuntimeException might not reach CXF JettyDestination
>>>>> >> (such
>>>>> >> that the original message with its phase chain can be preserved
>>>>> >> until
>>>>> >> the request is resumed) if some other application thread calls
>>>>> >> continuation.resume() or continuation suspend timeout expires.
>>>>> >>
>>>>> >> In case of ServiceMix the latter is a theoretical possibility at the
>>>>> >> least. I can see in its code this timeout is configured, but if this
>>>>> >> timeout is in the region of up to 1 sec or so then it's feasible
>>>>> >> that
>>>>> >> with a heavy  workload the race condition described above might come
>>>>> >> to
>>>>> >> life.
>>>>> >>
>>>>> >> That said, as part of my test, I found that even when such condition
>>>>> >> occurs, the 'worst' thing which can happen is that a new message and
>>>>> >> a
>>>>> >> new chain are created, that is, the request is not resumed from a
>>>>> >> 'suspended' ServiceInvokerInterceptor, but starts as if it was a new
>>>>> >> request alltogether, but it all works nonetheless, as all the stack
>>>>> >> variables used in various interceptors in my given test at least are
>>>>> >> all
>>>>> >> obtained from a message. The only downside is that that the work
>>>>> >> which
>>>>> >> has already been done earlier as part of handling the suspended
>>>>> >> request
>>>>> >> is repeated again by the interceptors. It can cause issues though in
>>>>> >> cases when some interceptors have sideeffects as part of handling a
>>>>> >> given input request, say modify a db, etc
>>>>> >>
>>>>> >> Now, this race condition can be safely avoided if a wrapper proposed
>>>>> >> by
>>>>> >> Dan is used by a server application code as the message can be
>>>>> >> preserved
>>>>> >> immediately at a point a user calls suspend on our wrapper, so
>>>>> >> without
>>>>> >> further doubts I've prototyped it too. It's not possible for SMX
>>>>> >> components though
>>>>> >>
>>>>> >> Comments ?
>>>>> >>
>>>>> >> Cheers, Sergey
>>>>> >>
>>>>> >>> I guess my thinking was to tie the continutations directly to the
>>>>> >>> PhaseInterceptorChain (since that is going to need to know about
>>>>> >>> them
>>>>> >>> anyway).   However, I suppose it could easily be done with a new
>>>>> >>> interface. Probably the best thing to do is to stub out a sample
>>>>> >>> usecase.   So here goes.....
>>>>> >>>
>>>>> >>> Lets take a "GreetMe" web service that in the greetMe method will
>>>>> >>> call
>>>>> >>> off asynchrously to some JMS service to actually get the result.
>>>>> >>>
>>>>> >>> @Resource(name = "jmsClient")
>>>>> >>> Greeter jmsGreeter
>>>>> >>> @Resource
>>>>> >>> WebServiceContext context;
>>>>> >>> public String greetMe(String arg) {
>>>>> >>>     ContinuationSupport contSupport = (ContinuationSupport)
>>>>> >>>              context.get(ContinuationSupport.class.getName());
>>>>> >>>     if (contSupport == null) {
>>>>> >>>          //continuations not supported, must wait
>>>>> >>>          return jmsGreeter.greetMe(arg);
>>>>> >>>     }
>>>>> >>>     Continuation cont = contSupport.getContinuation();
>>>>> >>>     if (cont.isResumed()) {
>>>>> >>> AsyncHandler<GreetMeResponse> handler = cont.getObject();
>>>>> >>>        return handler.get().getReturn();
>>>>> >>>     } else {
>>>>> >>>         AsyncHandler<GreetMeResponse> handler = new Handler(cont);
>>>>> >>>         jmsGreeter.greetMeAsync(arg, handler);
>>>>> >>>         cont.suspend(handler);
>>>>> >>> return null;   //won't actually get here as suspend will throw a
>>>>> >>> ContinuationException
>>>>> >>>     }
>>>>> >>> }
>>>>> >>>
>>>>> >>> The Handler would look something like:
>>>>> >>> class Handler implements AsyncHandler<GreetMeResponse> {
>>>>> >>> GreetMeResponse resp;
>>>>> >>>        Continuation cont;
>>>>> >>> public Handler(Continuation cont) {
>>>>> >>>            this.cont = cont;
>>>>> >>>        }
>>>>> >>>        public void handleResponse(Response<GreetMeLaterResponse>
>>>>> >>> response) { resp = response.get();
>>>>> >>>              cont.resume();
>>>>> >>>       }
>>>>> >>> }
>>>>> >>>
>>>>> >>> Basically, the HTTP/Jetty transport could provide an implementation
>>>>> >>> of
>>>>> >>> ContinuationSupport that wrappers the jetty stuff.    JMS could
>>>>> >>> provide
>>>>> >>> one that's pretty much a null op.   Transports that cannot support
>>>>> >>> it
>>>>> >>> (like servlet) just wouldn't provide an implementation.
>>>>> >>>
>>>>> >>>
>>>>> >>> Does that make sense?   Other ideas?
>>>>> >>>
>>>>> >>> Dan
>>>>> >>>
>>>>> >>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
>>>>> >>>> > No.   We don't want that.   Whatever we do should work for other
>>>>> >>>> > transports as well like JMS.  Thus, this shouldn't be tied to
>>>>> >>>> > jetty
>>>>> >>>> > continuations directly.
>>>>> >>>>
>>>>> >>>> No, I'm not suggesting to tie it up to jetty continuations.
>>>>> >>>> Ex.
>>>>> >>>>
>>>>> >>>> try {
>>>>> >>>>   invoke(); // continuation.suspend() somehow by the code being
>>>>> >>>> invoked upon }
>>>>> >>>> catch (RuntimeException ex) {
>>>>> >>>>
>>>>> >>>> if
>>>>> >>>>
>>>>> >>>> (ex.getClass().getName().equals("jetty.JettyContinuationException"))
>>>>> >>>> throw new SuspendedFault(ex);
>>>>> >>>>     // or PhaseInterceptorChain.suspend()
>>>>> >>>> }
>>>>> >>>> }
>>>>> >>>>
>>>>> >>>> > Most likely, we could add a "suspend()" method to
>>>>> >>>> > PhaseInterceptorChain that would do something very similar and
>>>>> >>>> > throw
>>>>> >>>> > a "SuspendException" or something in the same package as
>>>>> >>>> > PhaseInterceptorChain.
>>>>> >>>>
>>>>> >>>> When do we trigger this PhaseInterceptorChain.suspend() call
>>>>> >>>> though
>>>>> >>>> ?
>>>>> >>>>
>>>>> >>>> >   That would get propogated
>>>>> >>>> > back to the JettyDestination that could then call the jetty
>>>>> >>>> > things.
>>>>> >>>> >  The JMS transport could just catch it and more or less ignore
>>>>> >>>> > it.
>>>>> >>>> >  We'd then have to add a "resume()" method to the chain which
>>>>> >>>> > would
>>>>> >>>> > call back onto a listener that the transport provides.   Jetty
>>>>> >>>> > would
>>>>> >>>> > just call the jetty resume stuff. JMS would probably put a
>>>>> >>>> > runnable
>>>>> >>>> > on the workqueue to restart the chain.
>>>>> >>>>
>>>>> >>>> ok
>>>>> >>>>
>>>>> >>>> > Also, suspend() would need to check if there is a listener.  If
>>>>> >>>> > not,
>>>>> >>>> > it should not throw the exception.   Thus, the servlet transport
>>>>> >>>> > and
>>>>> >>>> > CORBA stuff that couldn't do this would pretty much just ignore
>>>>> >>>> > it.
>>>>> >>>>
>>>>> >>>> ok, not sure I understand about the listener but I think I see
>>>>> >>>> what
>>>>> >>>> you mean...
>>>>> >>>>
>>>>> >>>> > Basically, this needs to be done in such a way that it CAN work
>>>>> >>>> > for
>>>>> >>>> > the non-jetty cases.   However, it also needs to be done in a
>>>>> >>>> > way
>>>>> >>>> > that doesn't affect existing transports.
>>>>> >>>>
>>>>> >>>> +1
>>>>> >>>>
>>>>> >>>> Cheers, Sergey
>>>>> >>>>
>>>>> >>>> > Dan
>>>>> >>>> >
>>>>> >>>> >> 2. Now, if the above can be figured out, the next problem
>>>>> >>>> >> arises:
>>>>> >>>> >> when the "trigger" to wake up the continuation occurs
>>>>> >>>> >>
>>>>> >>>> >> I think we can can do in JettyDestination omething similar to
>>>>> >>>> >> what
>>>>> >>>> >> is done in SMX. When getting a SuspendedFault exception, we can
>>>>> >>>> >> extract from it the original continuation instance or else we
>>>>> >>>> >> can
>>>>> >>>> >> do ContinuationSupport.getContinuation(request) which should
>>>>> >>>> >> return
>>>>> >>>> >> us the instance. At this point we can use it as a ket to store
>>>>> >>>> >> the
>>>>> >>>> >> current exchange plus all the other info we may need.
>>>>> >>>> >>
>>>>> >>>> >> When the user/application code does continuation.resume(), the
>>>>> >>>> >> Jetty thread will come back and we will use the
>>>>> >>>> >> ContinuationSupport.getContinuation(request) to get us the
>>>>> >>>> >> active
>>>>> >>>> >> continuation and use it to extract the suspended exchange and
>>>>> >>>> >> proceed from there, say we'll call
>>>>> >>>> >> PhaseInterceptorPhase.resume(),
>>>>> >>>> >> etc, something along the lines you suggested
>>>>> >>>> >>
>>>>> >>>> >>
>>>>> >>>> >> 3. Basically, to do this "right", we'd need to audit pretty
>>>>> >>>> >> much
>>>>> >>>> >> everything to make sure nothing is stored on the stack and is
>>>>> >>>> >> "resumable". Once that is done, the rest is relatively easy.
>>>>> >>>> >>
>>>>> >>>> >> Yea - probably can be the quite challenging
>>>>> >>>> >>
>>>>> >>>> >>
>>>>> >>>> >> Thoughts ?
>>>>> >>>> >>
>>>>> >>>> >> Cheers, Sergey
>>>>> >>>> >>
>>>>> >>>> >>
>>>>> >>>> >>
>>>>> >>>> >>
>>>>> >>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
>>>>> >>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
>>>>> >>>> >> [3]
>>>>> >>>> >>
>>>>> >>>> >>
>>>>> >>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=126
>>>>> >>>> >>42361 #ac tion_12642361
>>>>> >>>> >
>>>>> >>>> > --
>>>>> >>>> > Daniel Kulp
>>>>> >>>> > dkulp@apache.org
>>>>> >>>> > http://dankulp.com/blog
>>>>> >>>
>>>>> >>> --
>>>>> >>> Daniel Kulp
>>>>> >>> dkulp@apache.org
>>>>> >>> http://dankulp.com/blog
>>>>
>>>>
>>>>
>>>> --
>>>> Daniel Kulp
>>>> dkulp@apache.org
>>>> http://dankulp.com/blog
>>>
>>>
>>
>>
>>
>> --
>> Cheers,
>> Guillaume Nodet
>> ------------------------
>> Blog: http://gnodet.blogspot.com/
>> ------------------------
>> Open Source SOA
>> http://fusesource.com
>
>



-- 
Cheers,
Guillaume Nodet
------------------------
Blog: http://gnodet.blogspot.com/
------------------------
Open Source SOA
http://fusesource.com

Re: Jetty Continuations in CXF

Posted by Sergey Beryozkin <se...@progress.com>.
Hi,

I have had a look. At the moment I don't see why we would have to do this sort of sophisticated handling of continuations in CXF 
JettyDestination. With CXF, it's the the code being invoked further down the line (be it SMX CXF binding components or application 
code) which needs to worry about doing either suspending or resuming continuations.

As far as CXF is concerned, it only needs to be able to associate a given inbound message with a continuation instance. I reckon 
saving it as a continuation user object (preserving the previously set one if any) is a lighter/simpler alternative than introducing 
maps in the JettyDestination.

However, as I said few times earlier in this thread, there's a race condition which I observe in certain conditions. Specifically, I 
have a test where a continuation is resumed virtually immediately after it's been suspended so before the code dealing with 
associating this suspended continuation with the inbound message has a chance to do it, the continuation.resume() has already 
occured. In CXF case I believe it can happen irrespectively of how we write the code dealing with continuations under the hood. It 
won't happen if continuation wrappers are used by the application code.

Do you have any comments about this race condition ? Or how a code you linked to can help to avoid it ?

Cheers, Sergey




>I would really encourage you to take a look at the smx code for
> handling continuations.
> We've had quite a hard time to handle race conditions, timeouts etc...
> because the continuation has a timeout and when the message is
> received back around the timeout, things can become a bit tricky.
>
> https://svn.apache.org/repos/asf/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
>
> We use one concurrent hash map to associate a message id to a
> continuation and multiple synchronization blocks on the continuation
> itself.
> Also the above code can be used with standard servlet servers (i.e.
> when the continuation is a blocking continuation) which is imho a good
> thing.
>
> On Tue, Nov 11, 2008 at 6:51 PM, Sergey Beryozkin
> <se...@progress.com> wrote:
>> Hi
>>
>>>>
>>>> I have 10 threads involved, 5 control ones + 5 application ones, I see a
>>>> loss of message approximately once in 5 cases. The fact that
>>>> cont.resume()
>>>> is done virtually immediately after cont.suspend() can explain it.
>>>
>>> Without seeing your code, I cannot really offer valid suggestions, but
>>> I'll
>>> try....   :-)
>>
>> I guess having it all on a branch would be handy then :-)
>>
>>>
>>> One thought was in the Continuation object, record if "resume()" has been
>>> called and if it's been callled by the time the stack unwinds back into
>>> the
>>> Http transport, just re-dispatch immediately.   Either that or have the
>>> resume block until the http transport sets a "ready to resume" flag just
>>> before it allows the exception to flow back into jetty.
>>
>> I have 2 tests.
>>
>> In one test an application server code interacts with a wrapper, both when
>> getting a continuation instance and when calling suspend/resume on it (as
>> suggested by yourself earlier in this thread). In this case, under the hood,
>> an inbound message is associated with a continuation instance before
>> suspend() is called on it. Thus even if the resulting exception does not
>> reach Jetty Destination in time before continuation.resume() is called by a
>> control thread, the message is not lost when the HTTP request is resumed as
>> that HTTP request had this continuation instance associated with it at a
>> time ContinuationsSupport.getContinuations(request) was called.
>>
>> In other test which I believe represents an integration scenario with SMX
>> better, an application server code calls Jetty
>> ContinuationsSupport.getContinuations(request) followed by
>> continuation.suspend(). Now, in this case, before a (Jetty RetryRequest)
>> runtime exception reaches a catch block in AbstractInvoker (where I try to
>> associate a message with continuation), one or two control threads manage to
>> squeeze in and call resume() before catch block has even been processed. So
>> by the time the wrapped exception reaches JettyDestination a request with a
>> resumed continuation has already come back...
>>
>> Does this explanation for a second case and the associated race condition
>> sounds reasonable ?
>>
>> Cheers, Sergey
>>
>>
>>
>>
>>
>>>
>>>
>>> Dan
>>>
>>>>
>>>> Cheers, Sergey
>>>>
>>>> > That said, I'm now trying to inject a message as a custom continuation
>>>> > object (while preserving the original one if any, both ways) as early
>>>> > as
>>>> > possible, in AbstractInvoker, so the time window at which the race
>>>> > condition I talked about earlier can cause the loss of the original
>>>> > message, is extremely small the time it taked for the
>>>> > continuation.suspend() exception to reach a catch block in
>>>> > AbstractInvoker.
>>>> >
>>>> > Cheers, Sergey
>>>> >
>>>> >> Hi,
>>>> >>
>>>> >> I did some system testing with Jetty continuations and it's going not
>>>> >> too bad. Here's one issue which I've encountered which might or might
>>>> >> not be a problem in cases where continuations are ustilized directly
>>>> >> (that is without our wrappers), as in case of say ServiceMix CXF
>>>> >> binding
>>>> >> component.
>>>> >>
>>>> >> The problem is that when continuation.suspend(timeout) has been
>>>> >> called,
>>>> >> a resulting RuntimeException might not reach CXF JettyDestination
>>>> >> (such
>>>> >> that the original message with its phase chain can be preserved until
>>>> >> the request is resumed) if some other application thread calls
>>>> >> continuation.resume() or continuation suspend timeout expires.
>>>> >>
>>>> >> In case of ServiceMix the latter is a theoretical possibility at the
>>>> >> least. I can see in its code this timeout is configured, but if this
>>>> >> timeout is in the region of up to 1 sec or so then it's feasible that
>>>> >> with a heavy  workload the race condition described above might come
>>>> >> to
>>>> >> life.
>>>> >>
>>>> >> That said, as part of my test, I found that even when such condition
>>>> >> occurs, the 'worst' thing which can happen is that a new message and a
>>>> >> new chain are created, that is, the request is not resumed from a
>>>> >> 'suspended' ServiceInvokerInterceptor, but starts as if it was a new
>>>> >> request alltogether, but it all works nonetheless, as all the stack
>>>> >> variables used in various interceptors in my given test at least are
>>>> >> all
>>>> >> obtained from a message. The only downside is that that the work which
>>>> >> has already been done earlier as part of handling the suspended
>>>> >> request
>>>> >> is repeated again by the interceptors. It can cause issues though in
>>>> >> cases when some interceptors have sideeffects as part of handling a
>>>> >> given input request, say modify a db, etc
>>>> >>
>>>> >> Now, this race condition can be safely avoided if a wrapper proposed
>>>> >> by
>>>> >> Dan is used by a server application code as the message can be
>>>> >> preserved
>>>> >> immediately at a point a user calls suspend on our wrapper, so without
>>>> >> further doubts I've prototyped it too. It's not possible for SMX
>>>> >> components though
>>>> >>
>>>> >> Comments ?
>>>> >>
>>>> >> Cheers, Sergey
>>>> >>
>>>> >>> I guess my thinking was to tie the continutations directly to the
>>>> >>> PhaseInterceptorChain (since that is going to need to know about them
>>>> >>> anyway).   However, I suppose it could easily be done with a new
>>>> >>> interface. Probably the best thing to do is to stub out a sample
>>>> >>> usecase.   So here goes.....
>>>> >>>
>>>> >>> Lets take a "GreetMe" web service that in the greetMe method will
>>>> >>> call
>>>> >>> off asynchrously to some JMS service to actually get the result.
>>>> >>>
>>>> >>> @Resource(name = "jmsClient")
>>>> >>> Greeter jmsGreeter
>>>> >>> @Resource
>>>> >>> WebServiceContext context;
>>>> >>> public String greetMe(String arg) {
>>>> >>>     ContinuationSupport contSupport = (ContinuationSupport)
>>>> >>>              context.get(ContinuationSupport.class.getName());
>>>> >>>     if (contSupport == null) {
>>>> >>>          //continuations not supported, must wait
>>>> >>>          return jmsGreeter.greetMe(arg);
>>>> >>>     }
>>>> >>>     Continuation cont = contSupport.getContinuation();
>>>> >>>     if (cont.isResumed()) {
>>>> >>> AsyncHandler<GreetMeResponse> handler = cont.getObject();
>>>> >>>        return handler.get().getReturn();
>>>> >>>     } else {
>>>> >>>         AsyncHandler<GreetMeResponse> handler = new Handler(cont);
>>>> >>>         jmsGreeter.greetMeAsync(arg, handler);
>>>> >>>         cont.suspend(handler);
>>>> >>> return null;   //won't actually get here as suspend will throw a
>>>> >>> ContinuationException
>>>> >>>     }
>>>> >>> }
>>>> >>>
>>>> >>> The Handler would look something like:
>>>> >>> class Handler implements AsyncHandler<GreetMeResponse> {
>>>> >>> GreetMeResponse resp;
>>>> >>>        Continuation cont;
>>>> >>> public Handler(Continuation cont) {
>>>> >>>            this.cont = cont;
>>>> >>>        }
>>>> >>>        public void handleResponse(Response<GreetMeLaterResponse>
>>>> >>> response) { resp = response.get();
>>>> >>>              cont.resume();
>>>> >>>       }
>>>> >>> }
>>>> >>>
>>>> >>> Basically, the HTTP/Jetty transport could provide an implementation
>>>> >>> of
>>>> >>> ContinuationSupport that wrappers the jetty stuff.    JMS could
>>>> >>> provide
>>>> >>> one that's pretty much a null op.   Transports that cannot support it
>>>> >>> (like servlet) just wouldn't provide an implementation.
>>>> >>>
>>>> >>>
>>>> >>> Does that make sense?   Other ideas?
>>>> >>>
>>>> >>> Dan
>>>> >>>
>>>> >>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
>>>> >>>> > No.   We don't want that.   Whatever we do should work for other
>>>> >>>> > transports as well like JMS.  Thus, this shouldn't be tied to
>>>> >>>> > jetty
>>>> >>>> > continuations directly.
>>>> >>>>
>>>> >>>> No, I'm not suggesting to tie it up to jetty continuations.
>>>> >>>> Ex.
>>>> >>>>
>>>> >>>> try {
>>>> >>>>   invoke(); // continuation.suspend() somehow by the code being
>>>> >>>> invoked upon }
>>>> >>>> catch (RuntimeException ex) {
>>>> >>>>
>>>> >>>> if
>>>> >>>> (ex.getClass().getName().equals("jetty.JettyContinuationException"))
>>>> >>>> throw new SuspendedFault(ex);
>>>> >>>>     // or PhaseInterceptorChain.suspend()
>>>> >>>> }
>>>> >>>> }
>>>> >>>>
>>>> >>>> > Most likely, we could add a "suspend()" method to
>>>> >>>> > PhaseInterceptorChain that would do something very similar and
>>>> >>>> > throw
>>>> >>>> > a "SuspendException" or something in the same package as
>>>> >>>> > PhaseInterceptorChain.
>>>> >>>>
>>>> >>>> When do we trigger this PhaseInterceptorChain.suspend() call though
>>>> >>>> ?
>>>> >>>>
>>>> >>>> >   That would get propogated
>>>> >>>> > back to the JettyDestination that could then call the jetty
>>>> >>>> > things.
>>>> >>>> >  The JMS transport could just catch it and more or less ignore it.
>>>> >>>> >  We'd then have to add a "resume()" method to the chain which
>>>> >>>> > would
>>>> >>>> > call back onto a listener that the transport provides.   Jetty
>>>> >>>> > would
>>>> >>>> > just call the jetty resume stuff. JMS would probably put a
>>>> >>>> > runnable
>>>> >>>> > on the workqueue to restart the chain.
>>>> >>>>
>>>> >>>> ok
>>>> >>>>
>>>> >>>> > Also, suspend() would need to check if there is a listener.  If
>>>> >>>> > not,
>>>> >>>> > it should not throw the exception.   Thus, the servlet transport
>>>> >>>> > and
>>>> >>>> > CORBA stuff that couldn't do this would pretty much just ignore
>>>> >>>> > it.
>>>> >>>>
>>>> >>>> ok, not sure I understand about the listener but I think I see what
>>>> >>>> you mean...
>>>> >>>>
>>>> >>>> > Basically, this needs to be done in such a way that it CAN work
>>>> >>>> > for
>>>> >>>> > the non-jetty cases.   However, it also needs to be done in a way
>>>> >>>> > that doesn't affect existing transports.
>>>> >>>>
>>>> >>>> +1
>>>> >>>>
>>>> >>>> Cheers, Sergey
>>>> >>>>
>>>> >>>> > Dan
>>>> >>>> >
>>>> >>>> >> 2. Now, if the above can be figured out, the next problem arises:
>>>> >>>> >> when the "trigger" to wake up the continuation occurs
>>>> >>>> >>
>>>> >>>> >> I think we can can do in JettyDestination omething similar to
>>>> >>>> >> what
>>>> >>>> >> is done in SMX. When getting a SuspendedFault exception, we can
>>>> >>>> >> extract from it the original continuation instance or else we can
>>>> >>>> >> do ContinuationSupport.getContinuation(request) which should
>>>> >>>> >> return
>>>> >>>> >> us the instance. At this point we can use it as a ket to store
>>>> >>>> >> the
>>>> >>>> >> current exchange plus all the other info we may need.
>>>> >>>> >>
>>>> >>>> >> When the user/application code does continuation.resume(), the
>>>> >>>> >> Jetty thread will come back and we will use the
>>>> >>>> >> ContinuationSupport.getContinuation(request) to get us the active
>>>> >>>> >> continuation and use it to extract the suspended exchange and
>>>> >>>> >> proceed from there, say we'll call
>>>> >>>> >> PhaseInterceptorPhase.resume(),
>>>> >>>> >> etc, something along the lines you suggested
>>>> >>>> >>
>>>> >>>> >>
>>>> >>>> >> 3. Basically, to do this "right", we'd need to audit pretty much
>>>> >>>> >> everything to make sure nothing is stored on the stack and is
>>>> >>>> >> "resumable". Once that is done, the rest is relatively easy.
>>>> >>>> >>
>>>> >>>> >> Yea - probably can be the quite challenging
>>>> >>>> >>
>>>> >>>> >>
>>>> >>>> >> Thoughts ?
>>>> >>>> >>
>>>> >>>> >> Cheers, Sergey
>>>> >>>> >>
>>>> >>>> >>
>>>> >>>> >>
>>>> >>>> >>
>>>> >>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
>>>> >>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
>>>> >>>> >> [3]
>>>> >>>> >>
>>>> >>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=126
>>>> >>>> >>42361 #ac tion_12642361
>>>> >>>> >
>>>> >>>> > --
>>>> >>>> > Daniel Kulp
>>>> >>>> > dkulp@apache.org
>>>> >>>> > http://dankulp.com/blog
>>>> >>>
>>>> >>> --
>>>> >>> Daniel Kulp
>>>> >>> dkulp@apache.org
>>>> >>> http://dankulp.com/blog
>>>
>>>
>>>
>>> --
>>> Daniel Kulp
>>> dkulp@apache.org
>>> http://dankulp.com/blog
>>
>>
>
>
>
> -- 
> Cheers,
> Guillaume Nodet
> ------------------------
> Blog: http://gnodet.blogspot.com/
> ------------------------
> Open Source SOA
> http://fusesource.com 


Re: Jetty Continuations in CXF

Posted by Guillaume Nodet <gn...@gmail.com>.
I would really encourage you to take a look at the smx code for
handling continuations.
We've had quite a hard time to handle race conditions, timeouts etc...
because the continuation has a timeout and when the message is
received back around the timeout, things can become a bit tricky.

https://svn.apache.org/repos/asf/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java

We use one concurrent hash map to associate a message id to a
continuation and multiple synchronization blocks on the continuation
itself.
Also the above code can be used with standard servlet servers (i.e.
when the continuation is a blocking continuation) which is imho a good
thing.

On Tue, Nov 11, 2008 at 6:51 PM, Sergey Beryozkin
<se...@progress.com> wrote:
> Hi
>
>>>
>>> I have 10 threads involved, 5 control ones + 5 application ones, I see a
>>> loss of message approximately once in 5 cases. The fact that
>>> cont.resume()
>>> is done virtually immediately after cont.suspend() can explain it.
>>
>> Without seeing your code, I cannot really offer valid suggestions, but
>> I'll
>> try....   :-)
>
> I guess having it all on a branch would be handy then :-)
>
>>
>> One thought was in the Continuation object, record if "resume()" has been
>> called and if it's been callled by the time the stack unwinds back into
>> the
>> Http transport, just re-dispatch immediately.   Either that or have the
>> resume block until the http transport sets a "ready to resume" flag just
>> before it allows the exception to flow back into jetty.
>
> I have 2 tests.
>
> In one test an application server code interacts with a wrapper, both when
> getting a continuation instance and when calling suspend/resume on it (as
> suggested by yourself earlier in this thread). In this case, under the hood,
> an inbound message is associated with a continuation instance before
> suspend() is called on it. Thus even if the resulting exception does not
> reach Jetty Destination in time before continuation.resume() is called by a
> control thread, the message is not lost when the HTTP request is resumed as
> that HTTP request had this continuation instance associated with it at a
> time ContinuationsSupport.getContinuations(request) was called.
>
> In other test which I believe represents an integration scenario with SMX
> better, an application server code calls Jetty
> ContinuationsSupport.getContinuations(request) followed by
> continuation.suspend(). Now, in this case, before a (Jetty RetryRequest)
> runtime exception reaches a catch block in AbstractInvoker (where I try to
> associate a message with continuation), one or two control threads manage to
> squeeze in and call resume() before catch block has even been processed. So
> by the time the wrapped exception reaches JettyDestination a request with a
> resumed continuation has already come back...
>
> Does this explanation for a second case and the associated race condition
> sounds reasonable ?
>
> Cheers, Sergey
>
>
>
>
>
>>
>>
>> Dan
>>
>>>
>>> Cheers, Sergey
>>>
>>> > That said, I'm now trying to inject a message as a custom continuation
>>> > object (while preserving the original one if any, both ways) as early
>>> > as
>>> > possible, in AbstractInvoker, so the time window at which the race
>>> > condition I talked about earlier can cause the loss of the original
>>> > message, is extremely small the time it taked for the
>>> > continuation.suspend() exception to reach a catch block in
>>> > AbstractInvoker.
>>> >
>>> > Cheers, Sergey
>>> >
>>> >> Hi,
>>> >>
>>> >> I did some system testing with Jetty continuations and it's going not
>>> >> too bad. Here's one issue which I've encountered which might or might
>>> >> not be a problem in cases where continuations are ustilized directly
>>> >> (that is without our wrappers), as in case of say ServiceMix CXF
>>> >> binding
>>> >> component.
>>> >>
>>> >> The problem is that when continuation.suspend(timeout) has been
>>> >> called,
>>> >> a resulting RuntimeException might not reach CXF JettyDestination
>>> >> (such
>>> >> that the original message with its phase chain can be preserved until
>>> >> the request is resumed) if some other application thread calls
>>> >> continuation.resume() or continuation suspend timeout expires.
>>> >>
>>> >> In case of ServiceMix the latter is a theoretical possibility at the
>>> >> least. I can see in its code this timeout is configured, but if this
>>> >> timeout is in the region of up to 1 sec or so then it's feasible that
>>> >> with a heavy  workload the race condition described above might come
>>> >> to
>>> >> life.
>>> >>
>>> >> That said, as part of my test, I found that even when such condition
>>> >> occurs, the 'worst' thing which can happen is that a new message and a
>>> >> new chain are created, that is, the request is not resumed from a
>>> >> 'suspended' ServiceInvokerInterceptor, but starts as if it was a new
>>> >> request alltogether, but it all works nonetheless, as all the stack
>>> >> variables used in various interceptors in my given test at least are
>>> >> all
>>> >> obtained from a message. The only downside is that that the work which
>>> >> has already been done earlier as part of handling the suspended
>>> >> request
>>> >> is repeated again by the interceptors. It can cause issues though in
>>> >> cases when some interceptors have sideeffects as part of handling a
>>> >> given input request, say modify a db, etc
>>> >>
>>> >> Now, this race condition can be safely avoided if a wrapper proposed
>>> >> by
>>> >> Dan is used by a server application code as the message can be
>>> >> preserved
>>> >> immediately at a point a user calls suspend on our wrapper, so without
>>> >> further doubts I've prototyped it too. It's not possible for SMX
>>> >> components though
>>> >>
>>> >> Comments ?
>>> >>
>>> >> Cheers, Sergey
>>> >>
>>> >>> I guess my thinking was to tie the continutations directly to the
>>> >>> PhaseInterceptorChain (since that is going to need to know about them
>>> >>> anyway).   However, I suppose it could easily be done with a new
>>> >>> interface. Probably the best thing to do is to stub out a sample
>>> >>> usecase.   So here goes.....
>>> >>>
>>> >>> Lets take a "GreetMe" web service that in the greetMe method will
>>> >>> call
>>> >>> off asynchrously to some JMS service to actually get the result.
>>> >>>
>>> >>> @Resource(name = "jmsClient")
>>> >>> Greeter jmsGreeter
>>> >>> @Resource
>>> >>> WebServiceContext context;
>>> >>> public String greetMe(String arg) {
>>> >>>     ContinuationSupport contSupport = (ContinuationSupport)
>>> >>>              context.get(ContinuationSupport.class.getName());
>>> >>>     if (contSupport == null) {
>>> >>>          //continuations not supported, must wait
>>> >>>          return jmsGreeter.greetMe(arg);
>>> >>>     }
>>> >>>     Continuation cont = contSupport.getContinuation();
>>> >>>     if (cont.isResumed()) {
>>> >>> AsyncHandler<GreetMeResponse> handler = cont.getObject();
>>> >>>        return handler.get().getReturn();
>>> >>>     } else {
>>> >>>         AsyncHandler<GreetMeResponse> handler = new Handler(cont);
>>> >>>         jmsGreeter.greetMeAsync(arg, handler);
>>> >>>         cont.suspend(handler);
>>> >>> return null;   //won't actually get here as suspend will throw a
>>> >>> ContinuationException
>>> >>>     }
>>> >>> }
>>> >>>
>>> >>> The Handler would look something like:
>>> >>> class Handler implements AsyncHandler<GreetMeResponse> {
>>> >>> GreetMeResponse resp;
>>> >>>        Continuation cont;
>>> >>> public Handler(Continuation cont) {
>>> >>>            this.cont = cont;
>>> >>>        }
>>> >>>        public void handleResponse(Response<GreetMeLaterResponse>
>>> >>> response) { resp = response.get();
>>> >>>              cont.resume();
>>> >>>       }
>>> >>> }
>>> >>>
>>> >>> Basically, the HTTP/Jetty transport could provide an implementation
>>> >>> of
>>> >>> ContinuationSupport that wrappers the jetty stuff.    JMS could
>>> >>> provide
>>> >>> one that's pretty much a null op.   Transports that cannot support it
>>> >>> (like servlet) just wouldn't provide an implementation.
>>> >>>
>>> >>>
>>> >>> Does that make sense?   Other ideas?
>>> >>>
>>> >>> Dan
>>> >>>
>>> >>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
>>> >>>> > No.   We don't want that.   Whatever we do should work for other
>>> >>>> > transports as well like JMS.  Thus, this shouldn't be tied to
>>> >>>> > jetty
>>> >>>> > continuations directly.
>>> >>>>
>>> >>>> No, I'm not suggesting to tie it up to jetty continuations.
>>> >>>> Ex.
>>> >>>>
>>> >>>> try {
>>> >>>>   invoke(); // continuation.suspend() somehow by the code being
>>> >>>> invoked upon }
>>> >>>> catch (RuntimeException ex) {
>>> >>>>
>>> >>>> if
>>> >>>> (ex.getClass().getName().equals("jetty.JettyContinuationException"))
>>> >>>> throw new SuspendedFault(ex);
>>> >>>>     // or PhaseInterceptorChain.suspend()
>>> >>>> }
>>> >>>> }
>>> >>>>
>>> >>>> > Most likely, we could add a "suspend()" method to
>>> >>>> > PhaseInterceptorChain that would do something very similar and
>>> >>>> > throw
>>> >>>> > a "SuspendException" or something in the same package as
>>> >>>> > PhaseInterceptorChain.
>>> >>>>
>>> >>>> When do we trigger this PhaseInterceptorChain.suspend() call though
>>> >>>> ?
>>> >>>>
>>> >>>> >   That would get propogated
>>> >>>> > back to the JettyDestination that could then call the jetty
>>> >>>> > things.
>>> >>>> >  The JMS transport could just catch it and more or less ignore it.
>>> >>>> >  We'd then have to add a "resume()" method to the chain which
>>> >>>> > would
>>> >>>> > call back onto a listener that the transport provides.   Jetty
>>> >>>> > would
>>> >>>> > just call the jetty resume stuff. JMS would probably put a
>>> >>>> > runnable
>>> >>>> > on the workqueue to restart the chain.
>>> >>>>
>>> >>>> ok
>>> >>>>
>>> >>>> > Also, suspend() would need to check if there is a listener.  If
>>> >>>> > not,
>>> >>>> > it should not throw the exception.   Thus, the servlet transport
>>> >>>> > and
>>> >>>> > CORBA stuff that couldn't do this would pretty much just ignore
>>> >>>> > it.
>>> >>>>
>>> >>>> ok, not sure I understand about the listener but I think I see what
>>> >>>> you mean...
>>> >>>>
>>> >>>> > Basically, this needs to be done in such a way that it CAN work
>>> >>>> > for
>>> >>>> > the non-jetty cases.   However, it also needs to be done in a way
>>> >>>> > that doesn't affect existing transports.
>>> >>>>
>>> >>>> +1
>>> >>>>
>>> >>>> Cheers, Sergey
>>> >>>>
>>> >>>> > Dan
>>> >>>> >
>>> >>>> >> 2. Now, if the above can be figured out, the next problem arises:
>>> >>>> >> when the "trigger" to wake up the continuation occurs
>>> >>>> >>
>>> >>>> >> I think we can can do in JettyDestination omething similar to
>>> >>>> >> what
>>> >>>> >> is done in SMX. When getting a SuspendedFault exception, we can
>>> >>>> >> extract from it the original continuation instance or else we can
>>> >>>> >> do ContinuationSupport.getContinuation(request) which should
>>> >>>> >> return
>>> >>>> >> us the instance. At this point we can use it as a ket to store
>>> >>>> >> the
>>> >>>> >> current exchange plus all the other info we may need.
>>> >>>> >>
>>> >>>> >> When the user/application code does continuation.resume(), the
>>> >>>> >> Jetty thread will come back and we will use the
>>> >>>> >> ContinuationSupport.getContinuation(request) to get us the active
>>> >>>> >> continuation and use it to extract the suspended exchange and
>>> >>>> >> proceed from there, say we'll call
>>> >>>> >> PhaseInterceptorPhase.resume(),
>>> >>>> >> etc, something along the lines you suggested
>>> >>>> >>
>>> >>>> >>
>>> >>>> >> 3. Basically, to do this "right", we'd need to audit pretty much
>>> >>>> >> everything to make sure nothing is stored on the stack and is
>>> >>>> >> "resumable". Once that is done, the rest is relatively easy.
>>> >>>> >>
>>> >>>> >> Yea - probably can be the quite challenging
>>> >>>> >>
>>> >>>> >>
>>> >>>> >> Thoughts ?
>>> >>>> >>
>>> >>>> >> Cheers, Sergey
>>> >>>> >>
>>> >>>> >>
>>> >>>> >>
>>> >>>> >>
>>> >>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
>>> >>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
>>> >>>> >> [3]
>>> >>>> >>
>>> >>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=126
>>> >>>> >>42361 #ac tion_12642361
>>> >>>> >
>>> >>>> > --
>>> >>>> > Daniel Kulp
>>> >>>> > dkulp@apache.org
>>> >>>> > http://dankulp.com/blog
>>> >>>
>>> >>> --
>>> >>> Daniel Kulp
>>> >>> dkulp@apache.org
>>> >>> http://dankulp.com/blog
>>
>>
>>
>> --
>> Daniel Kulp
>> dkulp@apache.org
>> http://dankulp.com/blog
>
>



-- 
Cheers,
Guillaume Nodet
------------------------
Blog: http://gnodet.blogspot.com/
------------------------
Open Source SOA
http://fusesource.com

NPE in system JMS test

Posted by Sergey Beryozkin <se...@progress.com>.
Hi, seeing this NPE with the latest trunk :

 T E S T S
-------------------------------------------------------
Running org.apache.cxf.systest.multitransport.MultiTransportClientServerTest
Exception in thread "DefaultMessageListenerContainer-1" java.lang.NullPointerExc
eption
        at java.lang.String.indexOf(String.java:1564)
        at java.lang.String.indexOf(String.java:1546)
        at org.springframework.jms.support.JmsUtils.buildExceptionMessage(JmsUti
ls.java:255)
        at org.springframework.jms.listener.DefaultMessageListenerContainer.refr
eshConnectionUntilSuccessful(DefaultMessageListenerContainer.java:799)
        at org.springframework.jms.listener.DefaultMessageListenerContainer.reco
verAfterListenerSetupFailure(DefaultMessageListenerContainer.java:767)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$Asyn
cMessageListenerInvoker.run(DefaultMessageListenerContainer.java:898)
        at java.lang.Thread.run(Thread.java:595)
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 22.425 sec


looks like it's swallowed - can someone else see it ?

Sergey


----- Original Message ----- 
From: "Sergey Beryozkin" <se...@progress.com>
To: "Daniel Kulp" <dk...@apache.org>; <de...@cxf.apache.org>
Sent: Tuesday, November 11, 2008 5:51 PM
Subject: Re: Jetty Continuations in CXF


> Hi
>
>>>
>>> I have 10 threads involved, 5 control ones + 5 application ones, I see a
>>> loss of message approximately once in 5 cases. The fact that cont.resume()
>>> is done virtually immediately after cont.suspend() can explain it.
>>
>> Without seeing your code, I cannot really offer valid suggestions, but I'll
>> try....   :-)
>
> I guess having it all on a branch would be handy then :-)
>
>>
>> One thought was in the Continuation object, record if "resume()" has been
>> called and if it's been callled by the time the stack unwinds back into the
>> Http transport, just re-dispatch immediately.   Either that or have the
>> resume block until the http transport sets a "ready to resume" flag just
>> before it allows the exception to flow back into jetty.
>
> I have 2 tests.
>
> In one test an application server code interacts with a wrapper, both when getting a continuation instance and when calling 
> suspend/resume on it (as suggested by yourself earlier in this thread). In this case, under the hood, an inbound message is 
> associated with a continuation instance before suspend() is called on it. Thus even if the resulting exception does not reach 
> Jetty Destination in time before continuation.resume() is called by a control thread, the message is not lost when the HTTP 
> request is resumed as that HTTP request had this continuation instance associated with it at a time 
> ContinuationsSupport.getContinuations(request) was called.
>
> In other test which I believe represents an integration scenario with SMX better, an application server code calls Jetty 
> ContinuationsSupport.getContinuations(request) followed by continuation.suspend(). Now, in this case, before a (Jetty 
> RetryRequest) runtime exception reaches a catch block in AbstractInvoker (where I try to associate a message with continuation), 
> one or two control threads manage to squeeze in and call resume() before catch block has even been processed. So by the time the 
> wrapped exception reaches JettyDestination a request with a resumed continuation has already come back...
>
> Does this explanation for a second case and the associated race condition sounds reasonable ?
>
> Cheers, Sergey
>
>
>
>
>
>>
>>
>> Dan
>>
>>>
>>> Cheers, Sergey
>>>
>>> > That said, I'm now trying to inject a message as a custom continuation
>>> > object (while preserving the original one if any, both ways) as early as
>>> > possible, in AbstractInvoker, so the time window at which the race
>>> > condition I talked about earlier can cause the loss of the original
>>> > message, is extremely small the time it taked for the
>>> > continuation.suspend() exception to reach a catch block in
>>> > AbstractInvoker.
>>> >
>>> > Cheers, Sergey
>>> >
>>> >> Hi,
>>> >>
>>> >> I did some system testing with Jetty continuations and it's going not
>>> >> too bad. Here's one issue which I've encountered which might or might
>>> >> not be a problem in cases where continuations are ustilized directly
>>> >> (that is without our wrappers), as in case of say ServiceMix CXF binding
>>> >> component.
>>> >>
>>> >> The problem is that when continuation.suspend(timeout) has been called,
>>> >> a resulting RuntimeException might not reach CXF JettyDestination (such
>>> >> that the original message with its phase chain can be preserved until
>>> >> the request is resumed) if some other application thread calls
>>> >> continuation.resume() or continuation suspend timeout expires.
>>> >>
>>> >> In case of ServiceMix the latter is a theoretical possibility at the
>>> >> least. I can see in its code this timeout is configured, but if this
>>> >> timeout is in the region of up to 1 sec or so then it's feasible that
>>> >> with a heavy  workload the race condition described above might come to
>>> >> life.
>>> >>
>>> >> That said, as part of my test, I found that even when such condition
>>> >> occurs, the 'worst' thing which can happen is that a new message and a
>>> >> new chain are created, that is, the request is not resumed from a
>>> >> 'suspended' ServiceInvokerInterceptor, but starts as if it was a new
>>> >> request alltogether, but it all works nonetheless, as all the stack
>>> >> variables used in various interceptors in my given test at least are all
>>> >> obtained from a message. The only downside is that that the work which
>>> >> has already been done earlier as part of handling the suspended request
>>> >> is repeated again by the interceptors. It can cause issues though in
>>> >> cases when some interceptors have sideeffects as part of handling a
>>> >> given input request, say modify a db, etc
>>> >>
>>> >> Now, this race condition can be safely avoided if a wrapper proposed by
>>> >> Dan is used by a server application code as the message can be preserved
>>> >> immediately at a point a user calls suspend on our wrapper, so without
>>> >> further doubts I've prototyped it too. It's not possible for SMX
>>> >> components though
>>> >>
>>> >> Comments ?
>>> >>
>>> >> Cheers, Sergey
>>> >>
>>> >>> I guess my thinking was to tie the continutations directly to the
>>> >>> PhaseInterceptorChain (since that is going to need to know about them
>>> >>> anyway).   However, I suppose it could easily be done with a new
>>> >>> interface. Probably the best thing to do is to stub out a sample
>>> >>> usecase.   So here goes.....
>>> >>>
>>> >>> Lets take a "GreetMe" web service that in the greetMe method will call
>>> >>> off asynchrously to some JMS service to actually get the result.
>>> >>>
>>> >>> @Resource(name = "jmsClient")
>>> >>> Greeter jmsGreeter
>>> >>> @Resource
>>> >>> WebServiceContext context;
>>> >>> public String greetMe(String arg) {
>>> >>>     ContinuationSupport contSupport = (ContinuationSupport)
>>> >>>              context.get(ContinuationSupport.class.getName());
>>> >>>     if (contSupport == null) {
>>> >>>          //continuations not supported, must wait
>>> >>>          return jmsGreeter.greetMe(arg);
>>> >>>     }
>>> >>>     Continuation cont = contSupport.getContinuation();
>>> >>>     if (cont.isResumed()) {
>>> >>> AsyncHandler<GreetMeResponse> handler = cont.getObject();
>>> >>>        return handler.get().getReturn();
>>> >>>     } else {
>>> >>>         AsyncHandler<GreetMeResponse> handler = new Handler(cont);
>>> >>>         jmsGreeter.greetMeAsync(arg, handler);
>>> >>>         cont.suspend(handler);
>>> >>> return null;   //won't actually get here as suspend will throw a
>>> >>> ContinuationException
>>> >>>     }
>>> >>> }
>>> >>>
>>> >>> The Handler would look something like:
>>> >>> class Handler implements AsyncHandler<GreetMeResponse> {
>>> >>> GreetMeResponse resp;
>>> >>>        Continuation cont;
>>> >>> public Handler(Continuation cont) {
>>> >>>            this.cont = cont;
>>> >>>        }
>>> >>>        public void handleResponse(Response<GreetMeLaterResponse>
>>> >>> response) { resp = response.get();
>>> >>>              cont.resume();
>>> >>>       }
>>> >>> }
>>> >>>
>>> >>> Basically, the HTTP/Jetty transport could provide an implementation of
>>> >>> ContinuationSupport that wrappers the jetty stuff.    JMS could provide
>>> >>> one that's pretty much a null op.   Transports that cannot support it
>>> >>> (like servlet) just wouldn't provide an implementation.
>>> >>>
>>> >>>
>>> >>> Does that make sense?   Other ideas?
>>> >>>
>>> >>> Dan
>>> >>>
>>> >>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
>>> >>>> > No.   We don't want that.   Whatever we do should work for other
>>> >>>> > transports as well like JMS.  Thus, this shouldn't be tied to jetty
>>> >>>> > continuations directly.
>>> >>>>
>>> >>>> No, I'm not suggesting to tie it up to jetty continuations.
>>> >>>> Ex.
>>> >>>>
>>> >>>> try {
>>> >>>>   invoke(); // continuation.suspend() somehow by the code being
>>> >>>> invoked upon }
>>> >>>> catch (RuntimeException ex) {
>>> >>>>
>>> >>>> if
>>> >>>> (ex.getClass().getName().equals("jetty.JettyContinuationException"))
>>> >>>> throw new SuspendedFault(ex);
>>> >>>>     // or PhaseInterceptorChain.suspend()
>>> >>>> }
>>> >>>> }
>>> >>>>
>>> >>>> > Most likely, we could add a "suspend()" method to
>>> >>>> > PhaseInterceptorChain that would do something very similar and throw
>>> >>>> > a "SuspendException" or something in the same package as
>>> >>>> > PhaseInterceptorChain.
>>> >>>>
>>> >>>> When do we trigger this PhaseInterceptorChain.suspend() call though ?
>>> >>>>
>>> >>>> >   That would get propogated
>>> >>>> > back to the JettyDestination that could then call the jetty things.
>>> >>>> >  The JMS transport could just catch it and more or less ignore it.
>>> >>>> >  We'd then have to add a "resume()" method to the chain which would
>>> >>>> > call back onto a listener that the transport provides.   Jetty would
>>> >>>> > just call the jetty resume stuff. JMS would probably put a runnable
>>> >>>> > on the workqueue to restart the chain.
>>> >>>>
>>> >>>> ok
>>> >>>>
>>> >>>> > Also, suspend() would need to check if there is a listener.  If not,
>>> >>>> > it should not throw the exception.   Thus, the servlet transport and
>>> >>>> > CORBA stuff that couldn't do this would pretty much just ignore it.
>>> >>>>
>>> >>>> ok, not sure I understand about the listener but I think I see what
>>> >>>> you mean...
>>> >>>>
>>> >>>> > Basically, this needs to be done in such a way that it CAN work for
>>> >>>> > the non-jetty cases.   However, it also needs to be done in a way
>>> >>>> > that doesn't affect existing transports.
>>> >>>>
>>> >>>> +1
>>> >>>>
>>> >>>> Cheers, Sergey
>>> >>>>
>>> >>>> > Dan
>>> >>>> >
>>> >>>> >> 2. Now, if the above can be figured out, the next problem arises:
>>> >>>> >> when the "trigger" to wake up the continuation occurs
>>> >>>> >>
>>> >>>> >> I think we can can do in JettyDestination omething similar to what
>>> >>>> >> is done in SMX. When getting a SuspendedFault exception, we can
>>> >>>> >> extract from it the original continuation instance or else we can
>>> >>>> >> do ContinuationSupport.getContinuation(request) which should return
>>> >>>> >> us the instance. At this point we can use it as a ket to store the
>>> >>>> >> current exchange plus all the other info we may need.
>>> >>>> >>
>>> >>>> >> When the user/application code does continuation.resume(), the
>>> >>>> >> Jetty thread will come back and we will use the
>>> >>>> >> ContinuationSupport.getContinuation(request) to get us the active
>>> >>>> >> continuation and use it to extract the suspended exchange and
>>> >>>> >> proceed from there, say we'll call PhaseInterceptorPhase.resume(),
>>> >>>> >> etc, something along the lines you suggested
>>> >>>> >>
>>> >>>> >>
>>> >>>> >> 3. Basically, to do this "right", we'd need to audit pretty much
>>> >>>> >> everything to make sure nothing is stored on the stack and is
>>> >>>> >> "resumable". Once that is done, the rest is relatively easy.
>>> >>>> >>
>>> >>>> >> Yea - probably can be the quite challenging
>>> >>>> >>
>>> >>>> >>
>>> >>>> >> Thoughts ?
>>> >>>> >>
>>> >>>> >> Cheers, Sergey
>>> >>>> >>
>>> >>>> >>
>>> >>>> >>
>>> >>>> >>
>>> >>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
>>> >>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
>>> >>>> >> [3]
>>> >>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=126
>>> >>>> >>42361 #ac tion_12642361
>>> >>>> >
>>> >>>> > --
>>> >>>> > Daniel Kulp
>>> >>>> > dkulp@apache.org
>>> >>>> > http://dankulp.com/blog
>>> >>>
>>> >>> --
>>> >>> Daniel Kulp
>>> >>> dkulp@apache.org
>>> >>> http://dankulp.com/blog
>>
>>
>>
>> -- 
>> Daniel Kulp
>> dkulp@apache.org
>> http://dankulp.com/blog
> 


Re: Jetty Continuations in CXF

Posted by Sergey Beryozkin <se...@progress.com>.
Hi

>>
>> I have 10 threads involved, 5 control ones + 5 application ones, I see a
>> loss of message approximately once in 5 cases. The fact that cont.resume()
>> is done virtually immediately after cont.suspend() can explain it.
>
> Without seeing your code, I cannot really offer valid suggestions, but I'll
> try....   :-)

I guess having it all on a branch would be handy then :-)

>
> One thought was in the Continuation object, record if "resume()" has been
> called and if it's been callled by the time the stack unwinds back into the
> Http transport, just re-dispatch immediately.   Either that or have the
> resume block until the http transport sets a "ready to resume" flag just
> before it allows the exception to flow back into jetty.

I have 2 tests.

In one test an application server code interacts with a wrapper, both when getting a continuation instance and when calling 
suspend/resume on it (as suggested by yourself earlier in this thread). In this case, under the hood, an inbound message is 
associated with a continuation instance before suspend() is called on it. Thus even if the resulting exception does not reach Jetty 
Destination in time before continuation.resume() is called by a control thread, the message is not lost when the HTTP request is 
resumed as that HTTP request had this continuation instance associated with it at a time 
ContinuationsSupport.getContinuations(request) was called.

In other test which I believe represents an integration scenario with SMX better, an application server code calls Jetty 
ContinuationsSupport.getContinuations(request) followed by continuation.suspend(). Now, in this case, before a (Jetty RetryRequest) 
runtime exception reaches a catch block in AbstractInvoker (where I try to associate a message with continuation), one or two 
control threads manage to squeeze in and call resume() before catch block has even been processed. So by the time the wrapped 
exception reaches JettyDestination a request with a resumed continuation has already come back...

Does this explanation for a second case and the associated race condition sounds reasonable ?

Cheers, Sergey





>
>
> Dan
>
>>
>> Cheers, Sergey
>>
>> > That said, I'm now trying to inject a message as a custom continuation
>> > object (while preserving the original one if any, both ways) as early as
>> > possible, in AbstractInvoker, so the time window at which the race
>> > condition I talked about earlier can cause the loss of the original
>> > message, is extremely small the time it taked for the
>> > continuation.suspend() exception to reach a catch block in
>> > AbstractInvoker.
>> >
>> > Cheers, Sergey
>> >
>> >> Hi,
>> >>
>> >> I did some system testing with Jetty continuations and it's going not
>> >> too bad. Here's one issue which I've encountered which might or might
>> >> not be a problem in cases where continuations are ustilized directly
>> >> (that is without our wrappers), as in case of say ServiceMix CXF binding
>> >> component.
>> >>
>> >> The problem is that when continuation.suspend(timeout) has been called,
>> >> a resulting RuntimeException might not reach CXF JettyDestination (such
>> >> that the original message with its phase chain can be preserved until
>> >> the request is resumed) if some other application thread calls
>> >> continuation.resume() or continuation suspend timeout expires.
>> >>
>> >> In case of ServiceMix the latter is a theoretical possibility at the
>> >> least. I can see in its code this timeout is configured, but if this
>> >> timeout is in the region of up to 1 sec or so then it's feasible that
>> >> with a heavy  workload the race condition described above might come to
>> >> life.
>> >>
>> >> That said, as part of my test, I found that even when such condition
>> >> occurs, the 'worst' thing which can happen is that a new message and a
>> >> new chain are created, that is, the request is not resumed from a
>> >> 'suspended' ServiceInvokerInterceptor, but starts as if it was a new
>> >> request alltogether, but it all works nonetheless, as all the stack
>> >> variables used in various interceptors in my given test at least are all
>> >> obtained from a message. The only downside is that that the work which
>> >> has already been done earlier as part of handling the suspended request
>> >> is repeated again by the interceptors. It can cause issues though in
>> >> cases when some interceptors have sideeffects as part of handling a
>> >> given input request, say modify a db, etc
>> >>
>> >> Now, this race condition can be safely avoided if a wrapper proposed by
>> >> Dan is used by a server application code as the message can be preserved
>> >> immediately at a point a user calls suspend on our wrapper, so without
>> >> further doubts I've prototyped it too. It's not possible for SMX
>> >> components though
>> >>
>> >> Comments ?
>> >>
>> >> Cheers, Sergey
>> >>
>> >>> I guess my thinking was to tie the continutations directly to the
>> >>> PhaseInterceptorChain (since that is going to need to know about them
>> >>> anyway).   However, I suppose it could easily be done with a new
>> >>> interface. Probably the best thing to do is to stub out a sample
>> >>> usecase.   So here goes.....
>> >>>
>> >>> Lets take a "GreetMe" web service that in the greetMe method will call
>> >>> off asynchrously to some JMS service to actually get the result.
>> >>>
>> >>> @Resource(name = "jmsClient")
>> >>> Greeter jmsGreeter
>> >>> @Resource
>> >>> WebServiceContext context;
>> >>> public String greetMe(String arg) {
>> >>>     ContinuationSupport contSupport = (ContinuationSupport)
>> >>>              context.get(ContinuationSupport.class.getName());
>> >>>     if (contSupport == null) {
>> >>>          //continuations not supported, must wait
>> >>>          return jmsGreeter.greetMe(arg);
>> >>>     }
>> >>>     Continuation cont = contSupport.getContinuation();
>> >>>     if (cont.isResumed()) {
>> >>> AsyncHandler<GreetMeResponse> handler = cont.getObject();
>> >>>        return handler.get().getReturn();
>> >>>     } else {
>> >>>         AsyncHandler<GreetMeResponse> handler = new Handler(cont);
>> >>>         jmsGreeter.greetMeAsync(arg, handler);
>> >>>         cont.suspend(handler);
>> >>> return null;   //won't actually get here as suspend will throw a
>> >>> ContinuationException
>> >>>     }
>> >>> }
>> >>>
>> >>> The Handler would look something like:
>> >>> class Handler implements AsyncHandler<GreetMeResponse> {
>> >>> GreetMeResponse resp;
>> >>>        Continuation cont;
>> >>> public Handler(Continuation cont) {
>> >>>            this.cont = cont;
>> >>>        }
>> >>>        public void handleResponse(Response<GreetMeLaterResponse>
>> >>> response) { resp = response.get();
>> >>>              cont.resume();
>> >>>       }
>> >>> }
>> >>>
>> >>> Basically, the HTTP/Jetty transport could provide an implementation of
>> >>> ContinuationSupport that wrappers the jetty stuff.    JMS could provide
>> >>> one that's pretty much a null op.   Transports that cannot support it
>> >>> (like servlet) just wouldn't provide an implementation.
>> >>>
>> >>>
>> >>> Does that make sense?   Other ideas?
>> >>>
>> >>> Dan
>> >>>
>> >>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
>> >>>> > No.   We don't want that.   Whatever we do should work for other
>> >>>> > transports as well like JMS.  Thus, this shouldn't be tied to jetty
>> >>>> > continuations directly.
>> >>>>
>> >>>> No, I'm not suggesting to tie it up to jetty continuations.
>> >>>> Ex.
>> >>>>
>> >>>> try {
>> >>>>   invoke(); // continuation.suspend() somehow by the code being
>> >>>> invoked upon }
>> >>>> catch (RuntimeException ex) {
>> >>>>
>> >>>> if
>> >>>> (ex.getClass().getName().equals("jetty.JettyContinuationException"))
>> >>>> throw new SuspendedFault(ex);
>> >>>>     // or PhaseInterceptorChain.suspend()
>> >>>> }
>> >>>> }
>> >>>>
>> >>>> > Most likely, we could add a "suspend()" method to
>> >>>> > PhaseInterceptorChain that would do something very similar and throw
>> >>>> > a "SuspendException" or something in the same package as
>> >>>> > PhaseInterceptorChain.
>> >>>>
>> >>>> When do we trigger this PhaseInterceptorChain.suspend() call though ?
>> >>>>
>> >>>> >   That would get propogated
>> >>>> > back to the JettyDestination that could then call the jetty things.
>> >>>> >  The JMS transport could just catch it and more or less ignore it.
>> >>>> >  We'd then have to add a "resume()" method to the chain which would
>> >>>> > call back onto a listener that the transport provides.   Jetty would
>> >>>> > just call the jetty resume stuff. JMS would probably put a runnable
>> >>>> > on the workqueue to restart the chain.
>> >>>>
>> >>>> ok
>> >>>>
>> >>>> > Also, suspend() would need to check if there is a listener.  If not,
>> >>>> > it should not throw the exception.   Thus, the servlet transport and
>> >>>> > CORBA stuff that couldn't do this would pretty much just ignore it.
>> >>>>
>> >>>> ok, not sure I understand about the listener but I think I see what
>> >>>> you mean...
>> >>>>
>> >>>> > Basically, this needs to be done in such a way that it CAN work for
>> >>>> > the non-jetty cases.   However, it also needs to be done in a way
>> >>>> > that doesn't affect existing transports.
>> >>>>
>> >>>> +1
>> >>>>
>> >>>> Cheers, Sergey
>> >>>>
>> >>>> > Dan
>> >>>> >
>> >>>> >> 2. Now, if the above can be figured out, the next problem arises:
>> >>>> >> when the "trigger" to wake up the continuation occurs
>> >>>> >>
>> >>>> >> I think we can can do in JettyDestination omething similar to what
>> >>>> >> is done in SMX. When getting a SuspendedFault exception, we can
>> >>>> >> extract from it the original continuation instance or else we can
>> >>>> >> do ContinuationSupport.getContinuation(request) which should return
>> >>>> >> us the instance. At this point we can use it as a ket to store the
>> >>>> >> current exchange plus all the other info we may need.
>> >>>> >>
>> >>>> >> When the user/application code does continuation.resume(), the
>> >>>> >> Jetty thread will come back and we will use the
>> >>>> >> ContinuationSupport.getContinuation(request) to get us the active
>> >>>> >> continuation and use it to extract the suspended exchange and
>> >>>> >> proceed from there, say we'll call PhaseInterceptorPhase.resume(),
>> >>>> >> etc, something along the lines you suggested
>> >>>> >>
>> >>>> >>
>> >>>> >> 3. Basically, to do this "right", we'd need to audit pretty much
>> >>>> >> everything to make sure nothing is stored on the stack and is
>> >>>> >> "resumable". Once that is done, the rest is relatively easy.
>> >>>> >>
>> >>>> >> Yea - probably can be the quite challenging
>> >>>> >>
>> >>>> >>
>> >>>> >> Thoughts ?
>> >>>> >>
>> >>>> >> Cheers, Sergey
>> >>>> >>
>> >>>> >>
>> >>>> >>
>> >>>> >>
>> >>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
>> >>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
>> >>>> >> [3]
>> >>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=126
>> >>>> >>42361 #ac tion_12642361
>> >>>> >
>> >>>> > --
>> >>>> > Daniel Kulp
>> >>>> > dkulp@apache.org
>> >>>> > http://dankulp.com/blog
>> >>>
>> >>> --
>> >>> Daniel Kulp
>> >>> dkulp@apache.org
>> >>> http://dankulp.com/blog
>
>
>
> -- 
> Daniel Kulp
> dkulp@apache.org
> http://dankulp.com/blog 


Re: Jetty Continuations in CXF

Posted by Daniel Kulp <dk...@apache.org>.
On Friday 07 November 2008 8:56:46 am Sergey Beryozkin wrote:
> That said :-), even with this extremely small time window I'm getting log
> warnings. I guess it's due to a nature of my tests. I have contol threads
> waiting in the application code which do continuation.resume() as soon as
> they're notified that continuation.suspend() has occured.
>
> I have 10 threads involved, 5 control ones + 5 application ones, I see a
> loss of message approximately once in 5 cases. The fact that cont.resume()
> is done virtually immediately after cont.suspend() can explain it.

Without seeing your code, I cannot really offer valid suggestions, but I'll 
try....   :-)

One thought was in the Continuation object, record if "resume()" has been 
called and if it's been callled by the time the stack unwinds back into the 
Http transport, just re-dispatch immediately.   Either that or have the 
resume block until the http transport sets a "ready to resume" flag just 
before it allows the exception to flow back into jetty. 


Dan

>
> Cheers, Sergey
>
> > That said, I'm now trying to inject a message as a custom continuation
> > object (while preserving the original one if any, both ways) as early as
> > possible, in AbstractInvoker, so the time window at which the race
> > condition I talked about earlier can cause the loss of the original
> > message, is extremely small the time it taked for the
> > continuation.suspend() exception to reach a catch block in
> > AbstractInvoker.
> >
> > Cheers, Sergey
> >
> >> Hi,
> >>
> >> I did some system testing with Jetty continuations and it's going not
> >> too bad. Here's one issue which I've encountered which might or might
> >> not be a problem in cases where continuations are ustilized directly
> >> (that is without our wrappers), as in case of say ServiceMix CXF binding
> >> component.
> >>
> >> The problem is that when continuation.suspend(timeout) has been called,
> >> a resulting RuntimeException might not reach CXF JettyDestination (such
> >> that the original message with its phase chain can be preserved until
> >> the request is resumed) if some other application thread calls
> >> continuation.resume() or continuation suspend timeout expires.
> >>
> >> In case of ServiceMix the latter is a theoretical possibility at the
> >> least. I can see in its code this timeout is configured, but if this
> >> timeout is in the region of up to 1 sec or so then it's feasible that
> >> with a heavy  workload the race condition described above might come to
> >> life.
> >>
> >> That said, as part of my test, I found that even when such condition
> >> occurs, the 'worst' thing which can happen is that a new message and a
> >> new chain are created, that is, the request is not resumed from a
> >> 'suspended' ServiceInvokerInterceptor, but starts as if it was a new
> >> request alltogether, but it all works nonetheless, as all the stack
> >> variables used in various interceptors in my given test at least are all
> >> obtained from a message. The only downside is that that the work which
> >> has already been done earlier as part of handling the suspended request
> >> is repeated again by the interceptors. It can cause issues though in
> >> cases when some interceptors have sideeffects as part of handling a
> >> given input request, say modify a db, etc
> >>
> >> Now, this race condition can be safely avoided if a wrapper proposed by
> >> Dan is used by a server application code as the message can be preserved
> >> immediately at a point a user calls suspend on our wrapper, so without
> >> further doubts I've prototyped it too. It's not possible for SMX
> >> components though
> >>
> >> Comments ?
> >>
> >> Cheers, Sergey
> >>
> >>> I guess my thinking was to tie the continutations directly to the
> >>> PhaseInterceptorChain (since that is going to need to know about them
> >>> anyway).   However, I suppose it could easily be done with a new
> >>> interface. Probably the best thing to do is to stub out a sample
> >>> usecase.   So here goes.....
> >>>
> >>> Lets take a "GreetMe" web service that in the greetMe method will call
> >>> off asynchrously to some JMS service to actually get the result.
> >>>
> >>> @Resource(name = "jmsClient")
> >>> Greeter jmsGreeter
> >>> @Resource
> >>> WebServiceContext context;
> >>> public String greetMe(String arg) {
> >>>     ContinuationSupport contSupport = (ContinuationSupport)
> >>>              context.get(ContinuationSupport.class.getName());
> >>>     if (contSupport == null) {
> >>>          //continuations not supported, must wait
> >>>          return jmsGreeter.greetMe(arg);
> >>>     }
> >>>     Continuation cont = contSupport.getContinuation();
> >>>     if (cont.isResumed()) {
> >>> AsyncHandler<GreetMeResponse> handler = cont.getObject();
> >>>        return handler.get().getReturn();
> >>>     } else {
> >>>         AsyncHandler<GreetMeResponse> handler = new Handler(cont);
> >>>         jmsGreeter.greetMeAsync(arg, handler);
> >>>         cont.suspend(handler);
> >>> return null;   //won't actually get here as suspend will throw a
> >>> ContinuationException
> >>>     }
> >>> }
> >>>
> >>> The Handler would look something like:
> >>> class Handler implements AsyncHandler<GreetMeResponse> {
> >>> GreetMeResponse resp;
> >>>        Continuation cont;
> >>> public Handler(Continuation cont) {
> >>>            this.cont = cont;
> >>>        }
> >>>        public void handleResponse(Response<GreetMeLaterResponse>
> >>> response) { resp = response.get();
> >>>              cont.resume();
> >>>       }
> >>> }
> >>>
> >>> Basically, the HTTP/Jetty transport could provide an implementation of
> >>> ContinuationSupport that wrappers the jetty stuff.    JMS could provide
> >>> one that's pretty much a null op.   Transports that cannot support it
> >>> (like servlet) just wouldn't provide an implementation.
> >>>
> >>>
> >>> Does that make sense?   Other ideas?
> >>>
> >>> Dan
> >>>
> >>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
> >>>> > No.   We don't want that.   Whatever we do should work for other
> >>>> > transports as well like JMS.  Thus, this shouldn't be tied to jetty
> >>>> > continuations directly.
> >>>>
> >>>> No, I'm not suggesting to tie it up to jetty continuations.
> >>>> Ex.
> >>>>
> >>>> try {
> >>>>   invoke(); // continuation.suspend() somehow by the code being
> >>>> invoked upon }
> >>>> catch (RuntimeException ex) {
> >>>>
> >>>> if
> >>>> (ex.getClass().getName().equals("jetty.JettyContinuationException"))
> >>>> throw new SuspendedFault(ex);
> >>>>     // or PhaseInterceptorChain.suspend()
> >>>> }
> >>>> }
> >>>>
> >>>> > Most likely, we could add a "suspend()" method to
> >>>> > PhaseInterceptorChain that would do something very similar and throw
> >>>> > a "SuspendException" or something in the same package as
> >>>> > PhaseInterceptorChain.
> >>>>
> >>>> When do we trigger this PhaseInterceptorChain.suspend() call though ?
> >>>>
> >>>> >   That would get propogated
> >>>> > back to the JettyDestination that could then call the jetty things. 
> >>>> >  The JMS transport could just catch it and more or less ignore it.  
> >>>> >  We'd then have to add a "resume()" method to the chain which would
> >>>> > call back onto a listener that the transport provides.   Jetty would
> >>>> > just call the jetty resume stuff. JMS would probably put a runnable
> >>>> > on the workqueue to restart the chain.
> >>>>
> >>>> ok
> >>>>
> >>>> > Also, suspend() would need to check if there is a listener.  If not,
> >>>> > it should not throw the exception.   Thus, the servlet transport and
> >>>> > CORBA stuff that couldn't do this would pretty much just ignore it.
> >>>>
> >>>> ok, not sure I understand about the listener but I think I see what
> >>>> you mean...
> >>>>
> >>>> > Basically, this needs to be done in such a way that it CAN work for
> >>>> > the non-jetty cases.   However, it also needs to be done in a way
> >>>> > that doesn't affect existing transports.
> >>>>
> >>>> +1
> >>>>
> >>>> Cheers, Sergey
> >>>>
> >>>> > Dan
> >>>> >
> >>>> >> 2. Now, if the above can be figured out, the next problem arises:
> >>>> >> when the "trigger" to wake up the continuation occurs
> >>>> >>
> >>>> >> I think we can can do in JettyDestination omething similar to what
> >>>> >> is done in SMX. When getting a SuspendedFault exception, we can
> >>>> >> extract from it the original continuation instance or else we can
> >>>> >> do ContinuationSupport.getContinuation(request) which should return
> >>>> >> us the instance. At this point we can use it as a ket to store the
> >>>> >> current exchange plus all the other info we may need.
> >>>> >>
> >>>> >> When the user/application code does continuation.resume(), the
> >>>> >> Jetty thread will come back and we will use the
> >>>> >> ContinuationSupport.getContinuation(request) to get us the active
> >>>> >> continuation and use it to extract the suspended exchange and
> >>>> >> proceed from there, say we'll call PhaseInterceptorPhase.resume(),
> >>>> >> etc, something along the lines you suggested
> >>>> >>
> >>>> >>
> >>>> >> 3. Basically, to do this "right", we'd need to audit pretty much
> >>>> >> everything to make sure nothing is stored on the stack and is
> >>>> >> "resumable". Once that is done, the rest is relatively easy.
> >>>> >>
> >>>> >> Yea - probably can be the quite challenging
> >>>> >>
> >>>> >>
> >>>> >> Thoughts ?
> >>>> >>
> >>>> >> Cheers, Sergey
> >>>> >>
> >>>> >>
> >>>> >>
> >>>> >>
> >>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
> >>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
> >>>> >> [3]
> >>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=126
> >>>> >>42361 #ac tion_12642361
> >>>> >
> >>>> > --
> >>>> > Daniel Kulp
> >>>> > dkulp@apache.org
> >>>> > http://dankulp.com/blog
> >>>
> >>> --
> >>> Daniel Kulp
> >>> dkulp@apache.org
> >>> http://dankulp.com/blog



-- 
Daniel Kulp
dkulp@apache.org
http://dankulp.com/blog

Re: Jetty Continuations in CXF

Posted by Sergey Beryozkin <se...@progress.com>.
That said :-), even with this extremely small time window I'm getting log warnings. I guess it's due to a nature of my tests. I have 
contol threads waiting in the application code which do continuation.resume() as soon as they're notified that 
continuation.suspend() has occured.

I have 10 threads involved, 5 control ones + 5 application ones, I see a loss of message approximately once in 5 cases.
The fact that cont.resume() is done virtually immediately after cont.suspend() can explain it.

Cheers, Sergey

> That said, I'm now trying to inject a message as a custom continuation object (while preserving the original one if any, both 
> ways) as early as possible, in AbstractInvoker, so the time window at which the race condition I talked about earlier can cause 
> the loss of the original message, is extremely small the time it taked for the continuation.suspend() exception to reach a catch 
> block in AbstractInvoker.
>
> Cheers, Sergey
>
>> Hi,
>>
>> I did some system testing with Jetty continuations and it's going not too bad.
>> Here's one issue which I've encountered which might or might not be a problem in cases where continuations are ustilized directly 
>> (that is without our wrappers), as in case of say ServiceMix CXF binding component.
>>
>> The problem is that when continuation.suspend(timeout) has been called, a resulting RuntimeException might not reach CXF 
>> JettyDestination (such that the original message with its phase chain can be preserved until the request is resumed) if some 
>> other application thread calls continuation.resume() or continuation suspend timeout expires.
>>
>> In case of ServiceMix the latter is a theoretical possibility at the least. I can see in its code this timeout is configured, but 
>> if this timeout is in the region of up to 1 sec or so then it's feasible that with a heavy  workload the race condition described 
>> above might come to life.
>>
>> That said, as part of my test, I found that even when such condition occurs, the 'worst' thing which can happen is that a new 
>> message and a new chain are created, that is, the request is not resumed from a 'suspended' ServiceInvokerInterceptor, but starts 
>> as if it was a new request alltogether, but it all works nonetheless, as all the stack variables used in various interceptors in 
>> my given test at least are all obtained from a message. The only downside is that that the work which has already been done 
>> earlier as part of handling the suspended request is repeated again by the interceptors. It can cause issues though in cases when 
>> some interceptors have sideeffects as part of handling a given input request, say modify a db, etc
>>
>> Now, this race condition can be safely avoided if a wrapper proposed by Dan is used by a server application code as the message 
>> can be preserved immediately at a point a user calls suspend on our wrapper, so without further doubts I've prototyped it too. 
>> It's not possible for SMX components though
>>
>> Comments ?
>>
>> Cheers, Sergey
>>
>>>
>>> I guess my thinking was to tie the continutations directly to the
>>> PhaseInterceptorChain (since that is going to need to know about them
>>> anyway).   However, I suppose it could easily be done with a new interface.
>>> Probably the best thing to do is to stub out a sample usecase.   So here
>>> goes.....
>>>
>>> Lets take a "GreetMe" web service that in the greetMe method will call off
>>> asynchrously to some JMS service to actually get the result.
>>>
>>> @Resource(name = "jmsClient")
>>> Greeter jmsGreeter
>>> @Resource
>>> WebServiceContext context;
>>> public String greetMe(String arg) {
>>>     ContinuationSupport contSupport = (ContinuationSupport)
>>>              context.get(ContinuationSupport.class.getName());
>>>     if (contSupport == null) {
>>>          //continuations not supported, must wait
>>>          return jmsGreeter.greetMe(arg);
>>>     }
>>>     Continuation cont = contSupport.getContinuation();
>>>     if (cont.isResumed()) {
>>> AsyncHandler<GreetMeResponse> handler = cont.getObject();
>>>        return handler.get().getReturn();
>>>     } else {
>>>         AsyncHandler<GreetMeResponse> handler = new Handler(cont);
>>>         jmsGreeter.greetMeAsync(arg, handler);
>>>         cont.suspend(handler);
>>> return null;   //won't actually get here as suspend will throw a
>>> ContinuationException
>>>     }
>>> }
>>>
>>> The Handler would look something like:
>>> class Handler implements AsyncHandler<GreetMeResponse> {
>>> GreetMeResponse resp;
>>>        Continuation cont;
>>> public Handler(Continuation cont) {
>>>            this.cont = cont;
>>>        }
>>>        public void handleResponse(Response<GreetMeLaterResponse> response) {
>>>              resp = response.get();
>>>              cont.resume();
>>>       }
>>> }
>>>
>>> Basically, the HTTP/Jetty transport could provide an implementation of
>>> ContinuationSupport that wrappers the jetty stuff.    JMS could provide one
>>> that's pretty much a null op.   Transports that cannot support it (like
>>> servlet) just wouldn't provide an implementation.
>>>
>>>
>>> Does that make sense?   Other ideas?
>>>
>>> Dan
>>>
>>>
>>>
>>>
>>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
>>>> > No.   We don't want that.   Whatever we do should work for other
>>>> > transports as well like JMS.  Thus, this shouldn't be tied to jetty
>>>> > continuations directly.
>>>>
>>>> No, I'm not suggesting to tie it up to jetty continuations.
>>>> Ex.
>>>>
>>>> try {
>>>>   invoke(); // continuation.suspend() somehow by the code being invoked
>>>> upon }
>>>> catch (RuntimeException ex) {
>>>>
>>>> if (ex.getClass().getName().equals("jetty.JettyContinuationException"))
>>>>     throw new SuspendedFault(ex);
>>>>     // or PhaseInterceptorChain.suspend()
>>>> }
>>>> }
>>>>
>>>> > Most likely, we could add a "suspend()" method to PhaseInterceptorChain
>>>> > that would do something very similar and throw a "SuspendException" or
>>>> > something in the same package as PhaseInterceptorChain.
>>>>
>>>> When do we trigger this PhaseInterceptorChain.suspend() call though ?
>>>>
>>>> >   That would get propogated
>>>> > back to the JettyDestination that could then call the jetty things.   The
>>>> > JMS transport could just catch it and more or less ignore it.    We'd
>>>> > then have to add a "resume()" method to the chain which would call back
>>>> > onto a listener that the transport provides.   Jetty would just call the
>>>> > jetty resume stuff. JMS would probably put a runnable on the workqueue to
>>>> > restart the chain.
>>>>
>>>> ok
>>>>
>>>> > Also, suspend() would need to check if there is a listener.  If not, it
>>>> > should not throw the exception.   Thus, the servlet transport and CORBA
>>>> > stuff that couldn't do this would pretty much just ignore it.
>>>>
>>>> ok, not sure I understand about the listener but I think I see what you
>>>> mean...
>>>>
>>>> > Basically, this needs to be done in such a way that it CAN work for the
>>>> > non-jetty cases.   However, it also needs to be done in a way that
>>>> > doesn't affect existing transports.
>>>>
>>>> +1
>>>>
>>>> Cheers, Sergey
>>>>
>>>> > Dan
>>>> >
>>>> >> 2. Now, if the above can be figured out, the next problem arises: when
>>>> >> the "trigger" to wake up the continuation occurs
>>>> >>
>>>> >> I think we can can do in JettyDestination omething similar to what is
>>>> >> done in SMX. When getting a SuspendedFault exception, we can extract
>>>> >> from it the original continuation instance or else we can do
>>>> >> ContinuationSupport.getContinuation(request) which should return us the
>>>> >> instance. At this point we can use it as a ket to store the current
>>>> >> exchange plus all the other info we may need.
>>>> >>
>>>> >> When the user/application code does continuation.resume(), the Jetty
>>>> >> thread will come back and we will use the
>>>> >> ContinuationSupport.getContinuation(request) to get us the active
>>>> >> continuation and use it to extract the suspended exchange and proceed
>>>> >> from there, say we'll call PhaseInterceptorPhase.resume(), etc,
>>>> >> something along the lines you suggested
>>>> >>
>>>> >>
>>>> >> 3. Basically, to do this "right", we'd need to audit pretty much
>>>> >> everything to make sure nothing is stored on the stack and is
>>>> >> "resumable". Once that is done, the rest is relatively easy.
>>>> >>
>>>> >> Yea - probably can be the quite challenging
>>>> >>
>>>> >>
>>>> >> Thoughts ?
>>>> >>
>>>> >> Cheers, Sergey
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
>>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
>>>> >> [3]
>>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=12642361
>>>> >>#ac tion_12642361
>>>> >
>>>> > --
>>>> > Daniel Kulp
>>>> > dkulp@apache.org
>>>> > http://dankulp.com/blog
>>>
>>>
>>>
>>> -- 
>>> Daniel Kulp
>>> dkulp@apache.org
>>> http://dankulp.com/blog
>>
> 


Re: Jetty Continuations in CXF

Posted by Sergey Beryozkin <se...@progress.com>.
That said, I'm now trying to inject a message as a custom continuation object (while preserving the original one if any, both ways) 
as early as possible, in AbstractInvoker, so the time window at which the race condition I talked about earlier can cause the loss 
of the original message, is extremely small the time it taked for the continuation.suspend() exception to reach a catch block in 
AbstractInvoker.

Cheers, Sergey

> Hi,
>
> I did some system testing with Jetty continuations and it's going not too bad.
> Here's one issue which I've encountered which might or might not be a problem in cases where continuations are ustilized directly 
> (that is without our wrappers), as in case of say ServiceMix CXF binding component.
>
> The problem is that when continuation.suspend(timeout) has been called, a resulting RuntimeException might not reach CXF 
> JettyDestination (such that the original message with its phase chain can be preserved until the request is resumed) if some other 
> application thread calls continuation.resume() or continuation suspend timeout expires.
>
> In case of ServiceMix the latter is a theoretical possibility at the least. I can see in its code this timeout is configured, but 
> if this timeout is in the region of up to 1 sec or so then it's feasible that with a heavy  workload the race condition described 
> above might come to life.
>
> That said, as part of my test, I found that even when such condition occurs, the 'worst' thing which can happen is that a new 
> message and a new chain are created, that is, the request is not resumed from a 'suspended' ServiceInvokerInterceptor, but starts 
> as if it was a new request alltogether, but it all works nonetheless, as all the stack variables used in various interceptors in 
> my given test at least are all obtained from a message. The only downside is that that the work which has already been done 
> earlier as part of handling the suspended request is repeated again by the interceptors. It can cause issues though in cases when 
> some interceptors have sideeffects as part of handling a given input request, say modify a db, etc
>
> Now, this race condition can be safely avoided if a wrapper proposed by Dan is used by a server application code as the message 
> can be preserved immediately at a point a user calls suspend on our wrapper, so without further doubts I've prototyped it too. 
> It's not possible for SMX components though
>
> Comments ?
>
> Cheers, Sergey
>
>>
>> I guess my thinking was to tie the continutations directly to the
>> PhaseInterceptorChain (since that is going to need to know about them
>> anyway).   However, I suppose it could easily be done with a new interface.
>> Probably the best thing to do is to stub out a sample usecase.   So here
>> goes.....
>>
>> Lets take a "GreetMe" web service that in the greetMe method will call off
>> asynchrously to some JMS service to actually get the result.
>>
>> @Resource(name = "jmsClient")
>> Greeter jmsGreeter
>> @Resource
>> WebServiceContext context;
>> public String greetMe(String arg) {
>>     ContinuationSupport contSupport = (ContinuationSupport)
>>              context.get(ContinuationSupport.class.getName());
>>     if (contSupport == null) {
>>          //continuations not supported, must wait
>>          return jmsGreeter.greetMe(arg);
>>     }
>>     Continuation cont = contSupport.getContinuation();
>>     if (cont.isResumed()) {
>> AsyncHandler<GreetMeResponse> handler = cont.getObject();
>>        return handler.get().getReturn();
>>     } else {
>>         AsyncHandler<GreetMeResponse> handler = new Handler(cont);
>>         jmsGreeter.greetMeAsync(arg, handler);
>>         cont.suspend(handler);
>> return null;   //won't actually get here as suspend will throw a
>> ContinuationException
>>     }
>> }
>>
>> The Handler would look something like:
>> class Handler implements AsyncHandler<GreetMeResponse> {
>> GreetMeResponse resp;
>>        Continuation cont;
>> public Handler(Continuation cont) {
>>            this.cont = cont;
>>        }
>>        public void handleResponse(Response<GreetMeLaterResponse> response) {
>>              resp = response.get();
>>              cont.resume();
>>       }
>> }
>>
>> Basically, the HTTP/Jetty transport could provide an implementation of
>> ContinuationSupport that wrappers the jetty stuff.    JMS could provide one
>> that's pretty much a null op.   Transports that cannot support it (like
>> servlet) just wouldn't provide an implementation.
>>
>>
>> Does that make sense?   Other ideas?
>>
>> Dan
>>
>>
>>
>>
>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
>>> > No.   We don't want that.   Whatever we do should work for other
>>> > transports as well like JMS.  Thus, this shouldn't be tied to jetty
>>> > continuations directly.
>>>
>>> No, I'm not suggesting to tie it up to jetty continuations.
>>> Ex.
>>>
>>> try {
>>>   invoke(); // continuation.suspend() somehow by the code being invoked
>>> upon }
>>> catch (RuntimeException ex) {
>>>
>>> if (ex.getClass().getName().equals("jetty.JettyContinuationException"))
>>>     throw new SuspendedFault(ex);
>>>     // or PhaseInterceptorChain.suspend()
>>> }
>>> }
>>>
>>> > Most likely, we could add a "suspend()" method to PhaseInterceptorChain
>>> > that would do something very similar and throw a "SuspendException" or
>>> > something in the same package as PhaseInterceptorChain.
>>>
>>> When do we trigger this PhaseInterceptorChain.suspend() call though ?
>>>
>>> >   That would get propogated
>>> > back to the JettyDestination that could then call the jetty things.   The
>>> > JMS transport could just catch it and more or less ignore it.    We'd
>>> > then have to add a "resume()" method to the chain which would call back
>>> > onto a listener that the transport provides.   Jetty would just call the
>>> > jetty resume stuff. JMS would probably put a runnable on the workqueue to
>>> > restart the chain.
>>>
>>> ok
>>>
>>> > Also, suspend() would need to check if there is a listener.  If not, it
>>> > should not throw the exception.   Thus, the servlet transport and CORBA
>>> > stuff that couldn't do this would pretty much just ignore it.
>>>
>>> ok, not sure I understand about the listener but I think I see what you
>>> mean...
>>>
>>> > Basically, this needs to be done in such a way that it CAN work for the
>>> > non-jetty cases.   However, it also needs to be done in a way that
>>> > doesn't affect existing transports.
>>>
>>> +1
>>>
>>> Cheers, Sergey
>>>
>>> > Dan
>>> >
>>> >> 2. Now, if the above can be figured out, the next problem arises: when
>>> >> the "trigger" to wake up the continuation occurs
>>> >>
>>> >> I think we can can do in JettyDestination omething similar to what is
>>> >> done in SMX. When getting a SuspendedFault exception, we can extract
>>> >> from it the original continuation instance or else we can do
>>> >> ContinuationSupport.getContinuation(request) which should return us the
>>> >> instance. At this point we can use it as a ket to store the current
>>> >> exchange plus all the other info we may need.
>>> >>
>>> >> When the user/application code does continuation.resume(), the Jetty
>>> >> thread will come back and we will use the
>>> >> ContinuationSupport.getContinuation(request) to get us the active
>>> >> continuation and use it to extract the suspended exchange and proceed
>>> >> from there, say we'll call PhaseInterceptorPhase.resume(), etc,
>>> >> something along the lines you suggested
>>> >>
>>> >>
>>> >> 3. Basically, to do this "right", we'd need to audit pretty much
>>> >> everything to make sure nothing is stored on the stack and is
>>> >> "resumable". Once that is done, the rest is relatively easy.
>>> >>
>>> >> Yea - probably can be the quite challenging
>>> >>
>>> >>
>>> >> Thoughts ?
>>> >>
>>> >> Cheers, Sergey
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
>>> >> [3]
>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=12642361
>>> >>#ac tion_12642361
>>> >
>>> > --
>>> > Daniel Kulp
>>> > dkulp@apache.org
>>> > http://dankulp.com/blog
>>
>>
>>
>> -- 
>> Daniel Kulp
>> dkulp@apache.org
>> http://dankulp.com/blog
> 


Re: Jetty Continuations in CXF

Posted by Sergey Beryozkin <se...@progress.com>.
Hi,

I did some system testing with Jetty continuations and it's going not too bad.
Here's one issue which I've encountered which might or might not be a problem in cases where continuations are ustilized directly 
(that is without our wrappers), as in case of say ServiceMix CXF binding component.

The problem is that when continuation.suspend(timeout) has been called, a resulting RuntimeException might not reach CXF 
JettyDestination (such that the original message with its phase chain can be preserved until the request is resumed) if some other 
application thread calls continuation.resume() or continuation suspend timeout expires.

In case of ServiceMix the latter is a theoretical possibility at the least. I can see in its code this timeout is configured, but if 
this timeout is in the region of up to 1 sec or so then it's feasible that with a heavy  workload the race condition described above 
might come to life.

That said, as part of my test, I found that even when such condition occurs, the 'worst' thing which can happen is that a new 
message and a new chain are created, that is, the request is not resumed from a 'suspended' ServiceInvokerInterceptor, but starts as 
if it was a new request alltogether, but it all works nonetheless, as all the stack variables used in various interceptors in my 
given test at least are all obtained from a message. The only downside is that that the work which has already been done earlier as 
part of handling the suspended request is repeated again by the interceptors. It can cause issues though in cases when some 
interceptors have sideeffects as part of handling a given input request, say modify a db, etc

Now, this race condition can be safely avoided if a wrapper proposed by Dan is used by a server application code as the message can 
be preserved immediately at a point a user calls suspend on our wrapper, so without further doubts I've prototyped it too. It's not 
possible for SMX components though

Comments ?

Cheers, Sergey

>
> I guess my thinking was to tie the continutations directly to the
> PhaseInterceptorChain (since that is going to need to know about them
> anyway).   However, I suppose it could easily be done with a new interface.
> Probably the best thing to do is to stub out a sample usecase.   So here
> goes.....
>
> Lets take a "GreetMe" web service that in the greetMe method will call off
> asynchrously to some JMS service to actually get the result.
>
> @Resource(name = "jmsClient")
> Greeter jmsGreeter
> @Resource
> WebServiceContext context;
> public String greetMe(String arg) {
>     ContinuationSupport contSupport = (ContinuationSupport)
>              context.get(ContinuationSupport.class.getName());
>     if (contSupport == null) {
>          //continuations not supported, must wait
>          return jmsGreeter.greetMe(arg);
>     }
>     Continuation cont = contSupport.getContinuation();
>     if (cont.isResumed()) {
> AsyncHandler<GreetMeResponse> handler = cont.getObject();
>        return handler.get().getReturn();
>     } else {
>         AsyncHandler<GreetMeResponse> handler = new Handler(cont);
>         jmsGreeter.greetMeAsync(arg, handler);
>         cont.suspend(handler);
> return null;   //won't actually get here as suspend will throw a
> ContinuationException
>     }
> }
>
> The Handler would look something like:
> class Handler implements AsyncHandler<GreetMeResponse> {
> GreetMeResponse resp;
>        Continuation cont;
> public Handler(Continuation cont) {
>            this.cont = cont;
>        }
>        public void handleResponse(Response<GreetMeLaterResponse> response) {
>              resp = response.get();
>              cont.resume();
>       }
> }
>
> Basically, the HTTP/Jetty transport could provide an implementation of
> ContinuationSupport that wrappers the jetty stuff.    JMS could provide one
> that's pretty much a null op.   Transports that cannot support it (like
> servlet) just wouldn't provide an implementation.
>
>
> Does that make sense?   Other ideas?
>
> Dan
>
>
>
>
> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
>> > No.   We don't want that.   Whatever we do should work for other
>> > transports as well like JMS.  Thus, this shouldn't be tied to jetty
>> > continuations directly.
>>
>> No, I'm not suggesting to tie it up to jetty continuations.
>> Ex.
>>
>> try {
>>   invoke(); // continuation.suspend() somehow by the code being invoked
>> upon }
>> catch (RuntimeException ex) {
>>
>> if (ex.getClass().getName().equals("jetty.JettyContinuationException"))
>>     throw new SuspendedFault(ex);
>>     // or PhaseInterceptorChain.suspend()
>> }
>> }
>>
>> > Most likely, we could add a "suspend()" method to PhaseInterceptorChain
>> > that would do something very similar and throw a "SuspendException" or
>> > something in the same package as PhaseInterceptorChain.
>>
>> When do we trigger this PhaseInterceptorChain.suspend() call though ?
>>
>> >   That would get propogated
>> > back to the JettyDestination that could then call the jetty things.   The
>> > JMS transport could just catch it and more or less ignore it.    We'd
>> > then have to add a "resume()" method to the chain which would call back
>> > onto a listener that the transport provides.   Jetty would just call the
>> > jetty resume stuff. JMS would probably put a runnable on the workqueue to
>> > restart the chain.
>>
>> ok
>>
>> > Also, suspend() would need to check if there is a listener.  If not, it
>> > should not throw the exception.   Thus, the servlet transport and CORBA
>> > stuff that couldn't do this would pretty much just ignore it.
>>
>> ok, not sure I understand about the listener but I think I see what you
>> mean...
>>
>> > Basically, this needs to be done in such a way that it CAN work for the
>> > non-jetty cases.   However, it also needs to be done in a way that
>> > doesn't affect existing transports.
>>
>> +1
>>
>> Cheers, Sergey
>>
>> > Dan
>> >
>> >> 2. Now, if the above can be figured out, the next problem arises: when
>> >> the "trigger" to wake up the continuation occurs
>> >>
>> >> I think we can can do in JettyDestination omething similar to what is
>> >> done in SMX. When getting a SuspendedFault exception, we can extract
>> >> from it the original continuation instance or else we can do
>> >> ContinuationSupport.getContinuation(request) which should return us the
>> >> instance. At this point we can use it as a ket to store the current
>> >> exchange plus all the other info we may need.
>> >>
>> >> When the user/application code does continuation.resume(), the Jetty
>> >> thread will come back and we will use the
>> >> ContinuationSupport.getContinuation(request) to get us the active
>> >> continuation and use it to extract the suspended exchange and proceed
>> >> from there, say we'll call PhaseInterceptorPhase.resume(), etc,
>> >> something along the lines you suggested
>> >>
>> >>
>> >> 3. Basically, to do this "right", we'd need to audit pretty much
>> >> everything to make sure nothing is stored on the stack and is
>> >> "resumable". Once that is done, the rest is relatively easy.
>> >>
>> >> Yea - probably can be the quite challenging
>> >>
>> >>
>> >> Thoughts ?
>> >>
>> >> Cheers, Sergey
>> >>
>> >>
>> >>
>> >>
>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
>> >> [3]
>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=12642361
>> >>#ac tion_12642361
>> >
>> > --
>> > Daniel Kulp
>> > dkulp@apache.org
>> > http://dankulp.com/blog
>
>
>
> -- 
> Daniel Kulp
> dkulp@apache.org
> http://dankulp.com/blog 


Re: Jetty Continuations in CXF

Posted by Sergey Beryozkin <se...@progress.com>.
Hi,

>
> I guess my thinking was to tie the continutations directly to the
> PhaseInterceptorChain (since that is going to need to know about them
> anyway).   However, I suppose it could easily be done with a new interface.
> Probably the best thing to do is to stub out a sample usecase.   So here
> goes.....
>
> Lets take a "GreetMe" web service that in the greetMe method will call off
> asynchrously to some JMS service to actually get the result.
>
> @Resource(name = "jmsClient")
> Greeter jmsGreeter
> @Resource
> WebServiceContext context;
> public String greetMe(String arg) {
>     ContinuationSupport contSupport = (ContinuationSupport)
>              context.get(ContinuationSupport.class.getName());
>     if (contSupport == null) {
>          //continuations not supported, must wait
>          return jmsGreeter.greetMe(arg);
>     }
>     Continuation cont = contSupport.getContinuation();
>     if (cont.isResumed()) {
> AsyncHandler<GreetMeResponse> handler = cont.getObject();
>        return handler.get().getReturn();
>     } else {
>         AsyncHandler<GreetMeResponse> handler = new Handler(cont);
>         jmsGreeter.greetMeAsync(arg, handler);
>         cont.suspend(handler);
> return null;   //won't actually get here as suspend will throw a
> ContinuationException
>     }
> }
>
> The Handler would look something like:
> class Handler implements AsyncHandler<GreetMeResponse> {
> GreetMeResponse resp;
>        Continuation cont;
> public Handler(Continuation cont) {
>            this.cont = cont;
>        }
>        public void handleResponse(Response<GreetMeLaterResponse> response) {
>              resp = response.get();
>              cont.resume();
>       }
> }
>

It's an interesting idea. Worth having it in mind. However, I'm thinking, how reasonable it is to expect that a user would want to 
write a Continuations code portable across multiple transports ? I'd imagine that a user which wishes to do explicit continuations 
would do them with HTTP transport in mind, well, at least now that Jetty Continuations are available, with Servlet 3.0 supporting 
suspended invocations too. Otherwise we'd need to come up with our own ContiniationsSupport and Continuation classes - that's why 
would user use JettyContiations support and expect ths code work say with JMS or indeed with some other transport other than HTTP ?

I think we have two scenarious to look at. First one is when a CXF acts as a request provider for some ServiceMix components 
(implicitly serving as 'application code') - which is what CXF-1835 is mostly about.

Another one is about CXF service application code doing Continuations explicitly. I think it's a more advanced issue to deal with 
and your suggestion is about dealing with this second issue, this is how I see it anyway. And it attempts to do it with the 
portability in mind.

At this stage I'd like to focus on the first case which would be equivalent to a user doing jetty Continuations explicitly (which is 
what SMX does) :

import org.mortbay.util.ajax.ContinuationSupport;
import org.mortbay.util.ajax.Continuation;

@Resource
WebServiceContext context;
public String greetMe(String arg) {
    Continuation cont = context.get(Continuation.class.getName());

    if (cont.isResumed()) {
          AsyncHandler<GreetMeResponse> handler = cont.getObject();
          return handler.get().getReturn();
    } else {
           Continuation cont =  ContinuationSupport.getContinuation();
           AsyncHandler<GreetMeResponse> handler = new Handler(cont);
           jmsGreeter.greetMeAsync(arg, handler);
           cont.suspend(handler);
           return null;
    }
}



> Basically, the HTTP/Jetty transport could provide an implementation of
> ContinuationSupport that wrappers the jetty stuff.    JMS could provide one
> that's pretty much a null op.   Transports that cannot support it (like
> servlet) just wouldn't provide an implementation.
>
>
> Does that make sense?   Other ideas?

I think it does - but I'd just like to start with just HTTP in mind, just to get going and consider a transport portability issue at 
the the next stage.

Cheers, Sergey

>
> Dan
>
>
>
>
> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
>> > No.   We don't want that.   Whatever we do should work for other
>> > transports as well like JMS.  Thus, this shouldn't be tied to jetty
>> > continuations directly.
>>
>> No, I'm not suggesting to tie it up to jetty continuations.
>> Ex.
>>
>> try {
>>   invoke(); // continuation.suspend() somehow by the code being invoked
>> upon }
>> catch (RuntimeException ex) {
>>
>> if (ex.getClass().getName().equals("jetty.JettyContinuationException"))
>>     throw new SuspendedFault(ex);
>>     // or PhaseInterceptorChain.suspend()
>> }
>> }
>>
>> > Most likely, we could add a "suspend()" method to PhaseInterceptorChain
>> > that would do something very similar and throw a "SuspendException" or
>> > something in the same package as PhaseInterceptorChain.
>>
>> When do we trigger this PhaseInterceptorChain.suspend() call though ?
>>
>> >   That would get propogated
>> > back to the JettyDestination that could then call the jetty things.   The
>> > JMS transport could just catch it and more or less ignore it.    We'd
>> > then have to add a "resume()" method to the chain which would call back
>> > onto a listener that the transport provides.   Jetty would just call the
>> > jetty resume stuff. JMS would probably put a runnable on the workqueue to
>> > restart the chain.
>>
>> ok
>>
>> > Also, suspend() would need to check if there is a listener.  If not, it
>> > should not throw the exception.   Thus, the servlet transport and CORBA
>> > stuff that couldn't do this would pretty much just ignore it.
>>
>> ok, not sure I understand about the listener but I think I see what you
>> mean...
>>
>> > Basically, this needs to be done in such a way that it CAN work for the
>> > non-jetty cases.   However, it also needs to be done in a way that
>> > doesn't affect existing transports.
>>
>> +1
>>
>> Cheers, Sergey
>>
>> > Dan
>> >
>> >> 2. Now, if the above can be figured out, the next problem arises: when
>> >> the "trigger" to wake up the continuation occurs
>> >>
>> >> I think we can can do in JettyDestination omething similar to what is
>> >> done in SMX. When getting a SuspendedFault exception, we can extract
>> >> from it the original continuation instance or else we can do
>> >> ContinuationSupport.getContinuation(request) which should return us the
>> >> instance. At this point we can use it as a ket to store the current
>> >> exchange plus all the other info we may need.
>> >>
>> >> When the user/application code does continuation.resume(), the Jetty
>> >> thread will come back and we will use the
>> >> ContinuationSupport.getContinuation(request) to get us the active
>> >> continuation and use it to extract the suspended exchange and proceed
>> >> from there, say we'll call PhaseInterceptorPhase.resume(), etc,
>> >> something along the lines you suggested
>> >>
>> >>
>> >> 3. Basically, to do this "right", we'd need to audit pretty much
>> >> everything to make sure nothing is stored on the stack and is
>> >> "resumable". Once that is done, the rest is relatively easy.
>> >>
>> >> Yea - probably can be the quite challenging
>> >>
>> >>
>> >> Thoughts ?
>> >>
>> >> Cheers, Sergey
>> >>
>> >>
>> >>
>> >>
>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
>> >> [3]
>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=12642361
>> >>#ac tion_12642361
>> >
>> > --
>> > Daniel Kulp
>> > dkulp@apache.org
>> > http://dankulp.com/blog
>
>
>
> -- 
> Daniel Kulp
> dkulp@apache.org
> http://dankulp.com/blog 


Re: Jetty Continuations in CXF

Posted by Daniel Kulp <dk...@apache.org>.
I guess my thinking was to tie the continutations directly to the 
PhaseInterceptorChain (since that is going to need to know about them 
anyway).   However, I suppose it could easily be done with a new interface.    
Probably the best thing to do is to stub out a sample usecase.   So here 
goes.....    

Lets take a "GreetMe" web service that in the greetMe method will call off 
asynchrously to some JMS service to actually get the result.  

@Resource(name = "jmsClient")
Greeter jmsGreeter
@Resource
WebServiceContext context;
public String greetMe(String arg) {
     ContinuationSupport contSupport = (ContinuationSupport)
              context.get(ContinuationSupport.class.getName());
     if (contSupport == null) {
          //continuations not supported, must wait
          return jmsGreeter.greetMe(arg);
     }
     Continuation cont = contSupport.getContinuation();
     if (cont.isResumed()) {
	 AsyncHandler<GreetMeResponse> handler = cont.getObject();
        return handler.get().getReturn();
     } else {
         AsyncHandler<GreetMeResponse> handler = new Handler(cont);
         jmsGreeter.greetMeAsync(arg, handler);
         cont.suspend(handler);
	return null;   //won't actually get here as suspend will throw a 
ContinuationException
     }
}

The Handler would look something like:
class Handler implements AsyncHandler<GreetMeResponse> {        
	GreetMeResponse resp;
        Continuation cont;
	public Handler(Continuation cont) {
            this.cont = cont;
        }        
        public void handleResponse(Response<GreetMeLaterResponse> response) {
              resp = response.get();
              cont.resume();
       }
}

Basically, the HTTP/Jetty transport could provide an implementation of 
ContinuationSupport that wrappers the jetty stuff.    JMS could provide one 
that's pretty much a null op.   Transports that cannot support it (like 
servlet) just wouldn't provide an implementation.  

 
Does that make sense?   Other ideas?

Dan




On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
> > No.   We don't want that.   Whatever we do should work for other
> > transports as well like JMS.  Thus, this shouldn't be tied to jetty
> > continuations directly.
>
> No, I'm not suggesting to tie it up to jetty continuations.
> Ex.
>
> try {
>   invoke(); // continuation.suspend() somehow by the code being invoked
> upon }
> catch (RuntimeException ex) {
>
> if (ex.getClass().getName().equals("jetty.JettyContinuationException"))
>     throw new SuspendedFault(ex);
>     // or PhaseInterceptorChain.suspend()
> }
> }
>
> > Most likely, we could add a "suspend()" method to PhaseInterceptorChain
> > that would do something very similar and throw a "SuspendException" or
> > something in the same package as PhaseInterceptorChain.
>
> When do we trigger this PhaseInterceptorChain.suspend() call though ?
>
> >   That would get propogated
> > back to the JettyDestination that could then call the jetty things.   The
> > JMS transport could just catch it and more or less ignore it.    We'd
> > then have to add a "resume()" method to the chain which would call back
> > onto a listener that the transport provides.   Jetty would just call the
> > jetty resume stuff. JMS would probably put a runnable on the workqueue to
> > restart the chain.
>
> ok
>
> > Also, suspend() would need to check if there is a listener.  If not, it
> > should not throw the exception.   Thus, the servlet transport and CORBA
> > stuff that couldn't do this would pretty much just ignore it.
>
> ok, not sure I understand about the listener but I think I see what you
> mean...
>
> > Basically, this needs to be done in such a way that it CAN work for the
> > non-jetty cases.   However, it also needs to be done in a way that
> > doesn't affect existing transports.
>
> +1
>
> Cheers, Sergey
>
> > Dan
> >
> >> 2. Now, if the above can be figured out, the next problem arises: when
> >> the "trigger" to wake up the continuation occurs
> >>
> >> I think we can can do in JettyDestination omething similar to what is
> >> done in SMX. When getting a SuspendedFault exception, we can extract
> >> from it the original continuation instance or else we can do
> >> ContinuationSupport.getContinuation(request) which should return us the
> >> instance. At this point we can use it as a ket to store the current
> >> exchange plus all the other info we may need.
> >>
> >> When the user/application code does continuation.resume(), the Jetty
> >> thread will come back and we will use the
> >> ContinuationSupport.getContinuation(request) to get us the active
> >> continuation and use it to extract the suspended exchange and proceed
> >> from there, say we'll call PhaseInterceptorPhase.resume(), etc,
> >> something along the lines you suggested
> >>
> >>
> >> 3. Basically, to do this "right", we'd need to audit pretty much
> >> everything to make sure nothing is stored on the stack and is
> >> "resumable". Once that is done, the rest is relatively easy.
> >>
> >> Yea - probably can be the quite challenging
> >>
> >>
> >> Thoughts ?
> >>
> >> Cheers, Sergey
> >>
> >>
> >>
> >>
> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
> >> [2] https://issues.apache.org/jira/browse/CXF-1835
> >> [3]
> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=12642361
> >>#ac tion_12642361
> >
> > --
> > Daniel Kulp
> > dkulp@apache.org
> > http://dankulp.com/blog



-- 
Daniel Kulp
dkulp@apache.org
http://dankulp.com/blog

Re: Jetty Continuations in CXF

Posted by Guillaume Nodet <gn...@gmail.com>.
Btw, is the JMS transport able to not block a thread too ?  Given the
very nature of JMS, this should be even more feasible than for HTTP.

On Fri, Oct 24, 2008 at 3:42 PM, Daniel Kulp <dk...@apache.org> wrote:
> On Friday 24 October 2008 9:27:01 am Sergey Beryozkin wrote:
>> Dan, here're some comments :
>>
>> 1. "something would need to be done to allow the "suspend" exception thing
>> to propogate up, but without taking a jetty dependency into the core."
>>
>> I guess the basic thing we can do is to check the class name of the
>> exception (like exception.getClass().equals("JettyException")), and if it
>> matches the expected name then we can wrap up this exception in a
>> SuspendedFault exception, to be recognized by the rest of CXF runtime
>
> No.   We don't want that.   Whatever we do should work for other transports as
> well like JMS.  Thus, this shouldn't be tied to jetty continuations directly.
>
> Most likely, we could add a "suspend()" method to PhaseInterceptorChain that
> would do something very similar and throw a "SuspendException" or something
> in the same package as PhaseInterceptorChain.   That would get propogated
> back to the JettyDestination that could then call the jetty things.   The JMS
> transport could just catch it and more or less ignore it.    We'd then have
> to add a "resume()" method to the chain which would call back onto a listener
> that the transport provides.   Jetty would just call the jetty resume stuff.
> JMS would probably put a runnable on the workqueue to restart the chain.
>
> Also, suspend() would need to check if there is a listener.  If not, it should
> not throw the exception.   Thus, the servlet transport and CORBA stuff that
> couldn't do this would pretty much just ignore it.
>
> Basically, this needs to be done in such a way that it CAN work for the
> non-jetty cases.   However, it also needs to be done in a way that doesn't
> affect existing transports.
>
> Dan
>
>>
>> 2. Now, if the above can be figured out, the next problem arises: when
>> the "trigger" to wake up the continuation occurs
>>
>> I think we can can do in JettyDestination omething similar to what is done
>> in SMX. When getting a SuspendedFault exception, we can extract from it the
>> original continuation instance or else we can do
>> ContinuationSupport.getContinuation(request) which should return us the
>> instance. At this point we can use it as a ket to store the current
>> exchange plus all the other info we may need.
>>
>> When the user/application code does continuation.resume(), the Jetty thread
>> will come back and we will use the
>> ContinuationSupport.getContinuation(request) to get us the active
>> continuation and use it to extract the suspended exchange and proceed from
>> there, say we'll call PhaseInterceptorPhase.resume(), etc, something along
>> the lines you suggested
>>
>>
>> 3. Basically, to do this "right", we'd need to audit pretty much everything
>> to make sure nothing is stored on the stack and is "resumable". Once that
>> is done, the rest is relatively easy.
>>
>> Yea - probably can be the quite challenging
>>
>>
>> Thoughts ?
>>
>> Cheers, Sergey
>>
>>
>>
>>
>> [1] http://docs.codehaus.org/display/JETTY/Continuations
>> [2] https://issues.apache.org/jira/browse/CXF-1835
>> [3]
>> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=12642361#ac
>>tion_12642361
>
>
>
> --
> Daniel Kulp
> dkulp@apache.org
> http://dankulp.com/blog
>



-- 
Cheers,
Guillaume Nodet
------------------------
Blog: http://gnodet.blogspot.com/
------------------------
Open Source SOA
http://fusesource.com

Re: Jetty Continuations in CXF

Posted by Sergey Beryozkin <se...@progress.com>.
> 
> No.   We don't want that.   Whatever we do should work for other transports as 
> well like JMS.  Thus, this shouldn't be tied to jetty continuations directly.

No, I'm not suggesting to tie it up to jetty continuations.
Ex.

try {
  invoke(); // continuation.suspend() somehow by the code being invoked upon
}
catch (RuntimeException ex) {

if (ex.getClass().getName().equals("jetty.JettyContinuationException"))
    throw new SuspendedFault(ex);
    // or PhaseInterceptorChain.suspend()
}
}


> 
> Most likely, we could add a "suspend()" method to PhaseInterceptorChain that 
> would do something very similar and throw a "SuspendException" or something 
> in the same package as PhaseInterceptorChain.

When do we trigger this PhaseInterceptorChain.suspend() call though ?



>   That would get propogated 
> back to the JettyDestination that could then call the jetty things.   The JMS 
> transport could just catch it and more or less ignore it.    We'd then have 
> to add a "resume()" method to the chain which would call back onto a listener 
> that the transport provides.   Jetty would just call the jetty resume stuff.   
> JMS would probably put a runnable on the workqueue to restart the chain.

ok

> 
> Also, suspend() would need to check if there is a listener.  If not, it should 
> not throw the exception.   Thus, the servlet transport and CORBA stuff that 
> couldn't do this would pretty much just ignore it.

ok, not sure I understand about the listener but I think I see what you mean...

> 
> Basically, this needs to be done in such a way that it CAN work for the 
> non-jetty cases.   However, it also needs to be done in a way that doesn't 
> affect existing transports.

+1

Cheers, Sergey

> 
> Dan
> 
>>
>> 2. Now, if the above can be figured out, the next problem arises: when
>> the "trigger" to wake up the continuation occurs
>>
>> I think we can can do in JettyDestination omething similar to what is done
>> in SMX. When getting a SuspendedFault exception, we can extract from it the
>> original continuation instance or else we can do
>> ContinuationSupport.getContinuation(request) which should return us the
>> instance. At this point we can use it as a ket to store the current
>> exchange plus all the other info we may need.
>>
>> When the user/application code does continuation.resume(), the Jetty thread
>> will come back and we will use the
>> ContinuationSupport.getContinuation(request) to get us the active
>> continuation and use it to extract the suspended exchange and proceed from
>> there, say we'll call PhaseInterceptorPhase.resume(), etc, something along
>> the lines you suggested
>>
>>
>> 3. Basically, to do this "right", we'd need to audit pretty much everything
>> to make sure nothing is stored on the stack and is "resumable". Once that
>> is done, the rest is relatively easy.
>>
>> Yea - probably can be the quite challenging
>>
>>
>> Thoughts ?
>>
>> Cheers, Sergey
>>
>>
>>
>>
>> [1] http://docs.codehaus.org/display/JETTY/Continuations
>> [2] https://issues.apache.org/jira/browse/CXF-1835
>> [3]
>> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=12642361#ac
>>tion_12642361
> 
> 
> 
> -- 
> Daniel Kulp
> dkulp@apache.org
> http://dankulp.com/blog

Re: Jetty Continuations in CXF

Posted by Daniel Kulp <dk...@apache.org>.
On Friday 24 October 2008 9:27:01 am Sergey Beryozkin wrote:
> Dan, here're some comments :
>
> 1. "something would need to be done to allow the "suspend" exception thing
> to propogate up, but without taking a jetty dependency into the core."
>
> I guess the basic thing we can do is to check the class name of the
> exception (like exception.getClass().equals("JettyException")), and if it
> matches the expected name then we can wrap up this exception in a
> SuspendedFault exception, to be recognized by the rest of CXF runtime

No.   We don't want that.   Whatever we do should work for other transports as 
well like JMS.  Thus, this shouldn't be tied to jetty continuations directly.

Most likely, we could add a "suspend()" method to PhaseInterceptorChain that 
would do something very similar and throw a "SuspendException" or something 
in the same package as PhaseInterceptorChain.   That would get propogated 
back to the JettyDestination that could then call the jetty things.   The JMS 
transport could just catch it and more or less ignore it.    We'd then have 
to add a "resume()" method to the chain which would call back onto a listener 
that the transport provides.   Jetty would just call the jetty resume stuff.   
JMS would probably put a runnable on the workqueue to restart the chain.

Also, suspend() would need to check if there is a listener.  If not, it should 
not throw the exception.   Thus, the servlet transport and CORBA stuff that 
couldn't do this would pretty much just ignore it.

Basically, this needs to be done in such a way that it CAN work for the 
non-jetty cases.   However, it also needs to be done in a way that doesn't 
affect existing transports.

Dan

>
> 2. Now, if the above can be figured out, the next problem arises: when
> the "trigger" to wake up the continuation occurs
>
> I think we can can do in JettyDestination omething similar to what is done
> in SMX. When getting a SuspendedFault exception, we can extract from it the
> original continuation instance or else we can do
> ContinuationSupport.getContinuation(request) which should return us the
> instance. At this point we can use it as a ket to store the current
> exchange plus all the other info we may need.
>
> When the user/application code does continuation.resume(), the Jetty thread
> will come back and we will use the
> ContinuationSupport.getContinuation(request) to get us the active
> continuation and use it to extract the suspended exchange and proceed from
> there, say we'll call PhaseInterceptorPhase.resume(), etc, something along
> the lines you suggested
>
>
> 3. Basically, to do this "right", we'd need to audit pretty much everything
> to make sure nothing is stored on the stack and is "resumable". Once that
> is done, the rest is relatively easy.
>
> Yea - probably can be the quite challenging
>
>
> Thoughts ?
>
> Cheers, Sergey
>
>
>
>
> [1] http://docs.codehaus.org/display/JETTY/Continuations
> [2] https://issues.apache.org/jira/browse/CXF-1835
> [3]
> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=12642361#ac
>tion_12642361



-- 
Daniel Kulp
dkulp@apache.org
http://dankulp.com/blog

Re: Jetty Continuations in CXF

Posted by Sergey Beryozkin <se...@progress.com>.
Forwarding to the right address

Hi

I'd like to continue the discussion on how to handle Jetty continuations[1] in CXF[2] here.

In short the requirement is for CXF to be able to handle the application code (ServiceMix JBI consumers served by ServiceMix CXF 
BindingComponent in this case) doing explicit continuations.

Ex. CXF receives a request on a Jetty thread, creates an exchange and sends it further along to the consumer. Consumer is about to 
do some work so it spawns some activity or checks for some event and then does continuation.suspend(). This results in a specific 
Runtime exception being thrown.

The challenge is how to 'suspend' the interception chain, let this exception propagate down to the Jetty stack so that it can free 
the thread and put this pending request in its internal queue, and then resume it when a consumer code decides to do 
continuation.resume().

See [3] for a more detailed description of the issues.

Dan, here're some comments :

1. "something would need to be done to allow the "suspend" exception thing to propogate up,
but without taking a jetty dependency into the core."

I guess the basic thing we can do is to check the class name of the exception (like exception.getClass().equals("JettyException")), 
and if it matches the expected name then we can wrap up this exception in a SuspendedFault exception, to be recognized by the rest 
of CXF runtime

2. Now, if the above can be figured out, the next problem arises: when
the "trigger" to wake up the continuation occurs

I think we can can do in JettyDestination omething similar to what is done in SMX. When getting a SuspendedFault exception, we can 
extract from it the original continuation instance or else we can do ContinuationSupport.getContinuation(request) which should 
return us the instance. At this point we can use it as a ket to store the current exchange plus all the other info we may need.

When the user/application code does continuation.resume(), the Jetty thread will come back and we will use the 
ContinuationSupport.getContinuation(request) to get us the active continuation and use it to extract the suspended exchange and 
proceed from there, say we'll call PhaseInterceptorPhase.resume(), etc, something along the lines you suggested


3. Basically, to do this "right", we'd need to audit pretty much everything to
make sure nothing is stored on the stack and is "resumable". Once that is
done, the rest is relatively easy.

Yea - probably can be the quite challenging


Thoughts ?

Cheers, Sergey




[1] http://docs.codehaus.org/display/JETTY/Continuations
[2] https://issues.apache.org/jira/browse/CXF-1835
[3] https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=12642361#action_12642361