You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2009/06/18 18:05:41 UTC
svn commit: r786142 - in /cxf/trunk:
rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/
rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/
rt/transports/http-jetty/src/test/java/org/apache...
Author: dkulp
Date: Thu Jun 18 16:05:41 2009
New Revision: 786142
URL: http://svn.apache.org/viewvc?rev=786142&view=rev
Log:
[CXF-2302] Fixes to the HTTP continuations to work with keep-alives
Modified:
cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java
cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java
cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/ClientServerWrappedContinuationTest.java
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/HelloImplWithWrapppedContinuation.java
Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java?rev=786142&r1=786141&r2=786142&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java Thu Jun 18 16:05:41 2009
@@ -44,6 +44,7 @@
import org.apache.cxf.transport.http.AbstractHTTPDestination;
import org.apache.cxf.transport.http.HTTPSession;
import org.apache.cxf.transport.http_jetty.continuations.JettyContinuationProvider;
+import org.apache.cxf.transport.http_jetty.continuations.JettyContinuationWrapper;
import org.apache.cxf.transports.http.QueryHandler;
import org.apache.cxf.transports.http.QueryHandlerRegistry;
import org.apache.cxf.transports.http.StemMatchingQueryHandler;
@@ -280,7 +281,6 @@
}
MessageImpl inMessage = retrieveFromContinuation(req);
-
if (inMessage == null) {
inMessage = new MessageImpl();
@@ -302,6 +302,15 @@
incomingObserver.onMessage(inMessage);
resp.flushBuffer();
baseRequest.setHandled(true);
+ JettyContinuationProvider p = (JettyContinuationProvider)inMessage
+ .get(ContinuationProvider.class.getName());
+ if (p != null) {
+ //make sure the continuation is stripped down
+ JettyContinuationWrapper c = p.getContinuation(false);
+ if (c != null) {
+ c.done();
+ }
+ }
} catch (SuspendedInvocationException ex) {
throw ex.getRuntimeException();
} catch (Fault ex) {
@@ -340,7 +349,7 @@
// if any, need to be restored
cont.setObject(ci.getUserObject());
}
- if (m == null && !cont.isNew()) {
+ if (m == null && (cont.isPending() || cont.isResumed())) {
String message = "No message for existing continuation, status : "
+ (cont.isPending() ? "Pending" : "Resumed");
if (!(o instanceof ContinuationInfo)) {
@@ -349,7 +358,6 @@
LOG.warning(message);
}
}
-
return m;
}
Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java?rev=786142&r1=786141&r2=786142&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java Thu Jun 18 16:05:41 2009
@@ -29,17 +29,23 @@
private HttpServletRequest request;
private Message inMessage;
+ private JettyContinuationWrapper wrapper;
public JettyContinuationProvider(HttpServletRequest req, Message m) {
request = req;
this.inMessage = m;
}
-
public Continuation getContinuation() {
+ return getContinuation(true);
+ }
+ public JettyContinuationWrapper getContinuation(boolean create) {
if (inMessage.getExchange().isOneWay()) {
return null;
}
- return new JettyContinuationWrapper(request, inMessage);
+ if (wrapper == null && create) {
+ wrapper = new JettyContinuationWrapper(request, inMessage);
+ }
+ return wrapper;
}
}
Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java?rev=786142&r1=786141&r2=786142&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java Thu Jun 18 16:05:41 2009
@@ -40,11 +40,15 @@
}
public Object getObject() {
- return continuation.getObject();
+ Object o = continuation.getObject();
+ if (o instanceof ContinuationInfo) {
+ return ((ContinuationInfo)o).getUserObject();
+ }
+ return o;
}
public boolean isNew() {
- return continuation.isNew();
+ return continuation.isNew() || (!continuation.isPending() && !continuation.isResumed());
}
public boolean isPending() {
@@ -92,12 +96,22 @@
throw new SuspendedInvocationException(ex);
}
}
+
+ public void done() {
+ ContinuationInfo ci = null;
+ Object obj = continuation.getObject();
+ if (obj instanceof ContinuationInfo) {
+ ci = (ContinuationInfo)obj;
+ continuation.setObject(ci.getUserObject());
+ }
+ continuation.reset();
+ }
protected Message getMessage() {
return message;
}
- protected org.mortbay.util.ajax.Continuation getContinuation() {
+ public org.mortbay.util.ajax.Continuation getContinuation() {
return continuation;
}
Modified: cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java?rev=786142&r1=786141&r2=786142&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java Thu Jun 18 16:05:41 2009
@@ -67,7 +67,7 @@
cw.isNew();
cw.isPending();
cw.isResumed();
- assertSame(ci, cw.getObject());
+ assertNull(cw.getObject());
cw.setObject(userObject);
cw.reset();
cw.resume();
Modified: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/ClientServerWrappedContinuationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/ClientServerWrappedContinuationTest.java?rev=786142&r1=786141&r2=786142&view=diff
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/ClientServerWrappedContinuationTest.java (original)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/ClientServerWrappedContinuationTest.java Thu Jun 18 16:05:41 2009
@@ -74,7 +74,7 @@
}
@Test
- public void testHttpWrappedContinuatuions() throws Exception {
+ public void testHttpWrappedContinuations() throws Exception {
SpringBusFactory bf = new SpringBusFactory();
Bus bus = bf.createBus(CLIENT_CONFIG_FILE);
BusFactory.setDefaultBus(bus);
@@ -115,6 +115,10 @@
executor.shutdownNow();
assertEquals("Not all invocations have been resumed", 0, controlDoneSignal.getCount());
assertEquals("Not all invocations have completed", 0, helloDoneSignal.getCount());
+
+ helloPort.sayHi("Dan1", "to:100");
+ helloPort.sayHi("Dan2", "to:100");
+ helloPort.sayHi("Dan3", "to:100");
}
Modified: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/HelloImplWithWrapppedContinuation.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/HelloImplWithWrapppedContinuation.java?rev=786142&r1=786141&r2=786142&view=diff
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/HelloImplWithWrapppedContinuation.java (original)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/HelloImplWithWrapppedContinuation.java Thu Jun 18 16:05:41 2009
@@ -54,7 +54,11 @@
Object userObject = secondName != null && secondName.length() > 0
? secondName : null;
continuation.setObject(userObject);
- suspendInvocation(firstName, continuation);
+ long timeout = 20000;
+ if (secondName.startsWith("to:")) {
+ timeout = Long.parseLong(secondName.substring(3));
+ }
+ suspendInvocation(firstName, continuation, timeout);
} else {
StringBuilder sb = new StringBuilder();
sb.append(firstName);
@@ -99,9 +103,9 @@
}
}
- private void suspendInvocation(String name, Continuation cont) {
+ private void suspendInvocation(String name, Continuation cont, long timeout) {
try {
- cont.suspend(20000);
+ cont.suspend(timeout);
} finally {
synchronized (suspended) {
suspended.put(name, cont);