You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2009/06/18 18:05:41 UTC

svn commit: r786142 - in /cxf/trunk: rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/ rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/ rt/transports/http-jetty/src/test/java/org/apache...

Author: dkulp
Date: Thu Jun 18 16:05:41 2009
New Revision: 786142

URL: http://svn.apache.org/viewvc?rev=786142&view=rev
Log:
[CXF-2302] Fixes to the HTTP continuations to work with keep-alives

Modified:
    cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
    cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java
    cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java
    cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/ClientServerWrappedContinuationTest.java
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/HelloImplWithWrapppedContinuation.java

Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java?rev=786142&r1=786141&r2=786142&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java Thu Jun 18 16:05:41 2009
@@ -44,6 +44,7 @@
 import org.apache.cxf.transport.http.AbstractHTTPDestination;
 import org.apache.cxf.transport.http.HTTPSession;
 import org.apache.cxf.transport.http_jetty.continuations.JettyContinuationProvider;
+import org.apache.cxf.transport.http_jetty.continuations.JettyContinuationWrapper;
 import org.apache.cxf.transports.http.QueryHandler;
 import org.apache.cxf.transports.http.QueryHandlerRegistry;
 import org.apache.cxf.transports.http.StemMatchingQueryHandler;
@@ -280,7 +281,6 @@
         }
         MessageImpl inMessage = retrieveFromContinuation(req);
         
-        
         if (inMessage == null) {
             
             inMessage = new MessageImpl();
@@ -302,6 +302,15 @@
             incomingObserver.onMessage(inMessage);
             resp.flushBuffer();
             baseRequest.setHandled(true);
+            JettyContinuationProvider p = (JettyContinuationProvider)inMessage
+                .get(ContinuationProvider.class.getName());
+            if (p != null) {
+                //make sure the continuation is stripped down 
+                JettyContinuationWrapper c = p.getContinuation(false);
+                if (c != null) {
+                    c.done();
+                }
+            }
         } catch (SuspendedInvocationException ex) {
             throw ex.getRuntimeException();
         } catch (Fault ex) {
@@ -340,7 +349,7 @@
                 // if any, need to be restored
                 cont.setObject(ci.getUserObject());
             }
-            if (m == null && !cont.isNew()) {
+            if (m == null && (cont.isPending() || cont.isResumed())) {
                 String message = "No message for existing continuation, status : "
                     + (cont.isPending() ? "Pending" : "Resumed");
                 if (!(o instanceof ContinuationInfo)) {
@@ -349,7 +358,6 @@
                 LOG.warning(message);
             }
         }
-        
         return m;
     }
     

Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java?rev=786142&r1=786141&r2=786142&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java Thu Jun 18 16:05:41 2009
@@ -29,17 +29,23 @@
 
     private HttpServletRequest request;
     private Message inMessage; 
+    private JettyContinuationWrapper wrapper;
     
     public JettyContinuationProvider(HttpServletRequest req, Message m) {
         request = req;
         this.inMessage = m;
     }
-    
     public Continuation getContinuation() {
+        return getContinuation(true);
+    }    
+    public JettyContinuationWrapper getContinuation(boolean create) {
         if (inMessage.getExchange().isOneWay()) {
             return null;
         }
-        return new JettyContinuationWrapper(request, inMessage);
+        if (wrapper == null && create) {
+            wrapper = new JettyContinuationWrapper(request, inMessage);
+        }
+        return wrapper;
     }
 
 }

Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java?rev=786142&r1=786141&r2=786142&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java Thu Jun 18 16:05:41 2009
@@ -40,11 +40,15 @@
     }
 
     public Object getObject() {
-        return continuation.getObject();
+        Object o = continuation.getObject();
+        if (o instanceof ContinuationInfo) {
+            return ((ContinuationInfo)o).getUserObject();
+        }
+        return o;
     }
 
     public boolean isNew() {
-        return continuation.isNew();
+        return continuation.isNew() || (!continuation.isPending() && !continuation.isResumed());
     }
 
     public boolean isPending() {
@@ -92,12 +96,22 @@
             throw new SuspendedInvocationException(ex);
         }
     }
+    
+    public void done() {
+        ContinuationInfo ci = null;
+        Object obj = continuation.getObject();
+        if (obj instanceof ContinuationInfo) {
+            ci = (ContinuationInfo)obj;
+            continuation.setObject(ci.getUserObject());
+        }
+        continuation.reset();
+    }
 
     protected Message getMessage() {
         return message;
     }
     
-    protected org.mortbay.util.ajax.Continuation getContinuation() {
+    public org.mortbay.util.ajax.Continuation getContinuation() {
         return continuation;
     }
     

Modified: cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java?rev=786142&r1=786141&r2=786142&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java Thu Jun 18 16:05:41 2009
@@ -67,7 +67,7 @@
         cw.isNew();
         cw.isPending();
         cw.isResumed();
-        assertSame(ci, cw.getObject());
+        assertNull(cw.getObject());
         cw.setObject(userObject);
         cw.reset();
         cw.resume();

Modified: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/ClientServerWrappedContinuationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/ClientServerWrappedContinuationTest.java?rev=786142&r1=786141&r2=786142&view=diff
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/ClientServerWrappedContinuationTest.java (original)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/ClientServerWrappedContinuationTest.java Thu Jun 18 16:05:41 2009
@@ -74,7 +74,7 @@
     }
 
     @Test
-    public void testHttpWrappedContinuatuions() throws Exception {
+    public void testHttpWrappedContinuations() throws Exception {
         SpringBusFactory bf = new SpringBusFactory();
         Bus bus = bf.createBus(CLIENT_CONFIG_FILE);
         BusFactory.setDefaultBus(bus);
@@ -115,6 +115,10 @@
         executor.shutdownNow();
         assertEquals("Not all invocations have been resumed", 0, controlDoneSignal.getCount());
         assertEquals("Not all invocations have completed", 0, helloDoneSignal.getCount());
+        
+        helloPort.sayHi("Dan1", "to:100");
+        helloPort.sayHi("Dan2", "to:100");
+        helloPort.sayHi("Dan3", "to:100");
     }
     
     

Modified: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/HelloImplWithWrapppedContinuation.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/HelloImplWithWrapppedContinuation.java?rev=786142&r1=786141&r2=786142&view=diff
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/HelloImplWithWrapppedContinuation.java (original)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/HelloImplWithWrapppedContinuation.java Thu Jun 18 16:05:41 2009
@@ -54,7 +54,11 @@
                 Object userObject = secondName != null && secondName.length() > 0 
                                     ? secondName : null;
                 continuation.setObject(userObject);
-                suspendInvocation(firstName, continuation);
+                long timeout = 20000;
+                if (secondName.startsWith("to:")) {
+                    timeout = Long.parseLong(secondName.substring(3));
+                }
+                suspendInvocation(firstName, continuation, timeout);
             } else {
                 StringBuilder sb = new StringBuilder();
                 sb.append(firstName);
@@ -99,9 +103,9 @@
         }
     }
     
-    private void suspendInvocation(String name, Continuation cont) {
+    private void suspendInvocation(String name, Continuation cont, long timeout) {
         try {
-            cont.suspend(20000);    
+            cont.suspend(timeout);    
         } finally {
             synchronized (suspended) {
                 suspended.put(name, cont);