You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2012/11/18 21:05:07 UTC
svn commit: r1410975 - in /cxf/trunk:
rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/
rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/
systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/
Author: sergeyb
Date: Sun Nov 18 20:05:06 2012
New Revision: 1410975
URL: http://svn.apache.org/viewvc?rev=1410975&view=rev
Log:
Making AsyncResponse work with Tomcat Servlet3 continuations
Modified:
cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java
cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/AbstractJAXRSContinuationsTest.java
cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java
Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java?rev=1410975&r1=1410974&r2=1410975&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java Sun Nov 18 20:05:06 2012
@@ -29,7 +29,6 @@ import java.util.ResourceBundle;
import java.util.logging.Logger;
import javax.ws.rs.NotFoundException;
-import javax.ws.rs.ServiceUnavailableException;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Application;
@@ -85,21 +84,8 @@ public class JAXRSInvoker extends Abstra
if (asyncResp != null) {
AsyncResponseImpl asyncImpl = (AsyncResponseImpl)asyncResp;
asyncImpl.prepareContinuation();
-
- if (asyncImpl.isResumedByApplication()) {
- Object asyncObj = asyncImpl.getResponseObject();
- if (asyncObj instanceof Throwable) {
- return handleFault(new Fault((Throwable)asyncObj),
- exchange.getInMessage(), null, null);
- } else {
- response = (Response)asyncObj;
- }
- } else if (asyncImpl.handleTimeout()) {
- return null;
- } else {
- return handleFault(new Fault(new ServiceUnavailableException()),
- exchange.getInMessage(), null, null);
- }
+ asyncImpl.handleTimeout();
+ return handleAsyncResponse(exchange, asyncImpl.getResponseObject());
}
}
if (response != null) {
@@ -132,6 +118,15 @@ public class JAXRSInvoker extends Abstra
}
}
+ private Object handleAsyncResponse(Exchange exchange, Object asyncObj) {
+ if (asyncObj instanceof Throwable) {
+ return handleFault(new Fault((Throwable)asyncObj),
+ exchange.getInMessage(), null, null);
+ } else {
+ return new MessageContentsList(asyncObj);
+ }
+ }
+
private void persistRoots(Exchange exchange, Object rootInstance, Object provider) {
exchange.put(JAXRSUtils.ROOT_INSTANCE, rootInstance);
exchange.put(JAXRSUtils.ROOT_PROVIDER, provider);
@@ -196,13 +191,18 @@ public class JAXRSInvoker extends Abstra
contextLoader = ClassLoaderUtils
.setThreadContextClassloader(resourceObject.getClass().getClassLoader());
}
- AsyncResponse asyncResponse = inMessage.get(AsyncResponse.class);
- if (asyncResponse != null) {
- inMessage.put(AsyncResponse.class, null);
- AsyncResponseImpl asyncImpl = (AsyncResponseImpl)asyncResponse;
- asyncImpl.suspend();
+ AsyncResponseImpl asyncResponse = null;
+ if (!ori.isSubResourceLocator()) {
+ asyncResponse = (AsyncResponseImpl)inMessage.get(AsyncResponse.class);
}
result = invoke(exchange, resourceObject, methodToInvoke, params);
+ if (asyncResponse != null) {
+ if (!asyncResponse.isSuspended() && !asyncResponse.isResumedByApplication()) {
+ asyncResponse.suspendContinuation();
+ } else {
+ result = handleAsyncResponse(exchange, asyncResponse.getResponseObject());
+ }
+ }
} catch (Fault ex) {
return handleFault(ex, inMessage, cri, methodToInvoke);
} finally {
@@ -227,9 +227,8 @@ public class JAXRSInvoker extends Abstra
result = checkResultObject(result, subResourcePath);
- subCri = cri.getSubResource(
- methodToInvoke.getReturnType(),
- ClassHelper.getRealClass(result));
+ subCri = cri.getSubResource(methodToInvoke.getReturnType(),
+ ClassHelper.getRealClass(result));
if (subCri == null) {
org.apache.cxf.common.i18n.Message errorM =
new org.apache.cxf.common.i18n.Message("NO_SUBRESOURCE_FOUND",
Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java?rev=1410975&r1=1410974&r2=1410975&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java Sun Nov 18 20:05:06 2012
@@ -21,6 +21,7 @@ package org.apache.cxf.jaxrs.impl;
import java.util.Date;
import java.util.concurrent.TimeUnit;
+import javax.ws.rs.ServiceUnavailableException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.CompletionCallback;
import javax.ws.rs.container.ResumeCallback;
@@ -41,6 +42,7 @@ public class AsyncResponseImpl implement
private Continuation cont;
private Message inMessage;
+ private boolean initialSuspend;
private boolean cancelled;
private volatile boolean done;
private boolean resumedByApplication;
@@ -72,7 +74,11 @@ public class AsyncResponseImpl implement
inMessage.getExchange().put(AsyncResponse.class, this);
cont.setObject(response);
resumedByApplication = true;
- cont.resume();
+ if (!initialSuspend) {
+ cont.resume();
+ } else {
+ initialSuspend = false;
+ }
}
@Override
@@ -121,6 +127,7 @@ public class AsyncResponseImpl implement
checkSuspended();
inMessage.getExchange().put(AsyncResponse.class, this);
long timeout = TimeUnit.MILLISECONDS.convert(time, unit);
+ initialSuspend = false;
cont.suspend(timeout);
}
@@ -178,7 +185,7 @@ public class AsyncResponseImpl implement
}
private void checkSuspended() {
- if (!cont.isPending()) {
+ if (!initialSuspend && !isSuspended()) {
throw new IllegalStateException();
}
}
@@ -200,9 +207,8 @@ public class AsyncResponseImpl implement
}
- // these methods are called by the runtime, not part of AsyncResponse
- public synchronized void suspend() {
- checkCancelled();
+ public synchronized void suspendContinuation() {
+ initialSuspend = false;
cont.suspend(AsyncResponse.NO_TIMEOUT);
}
@@ -218,20 +224,21 @@ public class AsyncResponseImpl implement
return resumedByApplication;
}
- public synchronized boolean handleTimeout() {
- if (!resumedByApplication && timeoutHandler != null) {
- suspend();
- timeoutHandler.handleTimeout(this);
- return true;
+ public synchronized void handleTimeout() {
+ if (!resumedByApplication) {
+ if (timeoutHandler != null) {
+ timeoutHandler.handleTimeout(this);
+ } else {
+ cont.setObject(new ServiceUnavailableException());
+ }
}
- return false;
-
}
private void initContinuation() {
ContinuationProvider provider =
(ContinuationProvider)inMessage.get(ContinuationProvider.class.getName());
cont = provider.getContinuation();
+ initialSuspend = true;
}
public void prepareContinuation() {
Modified: cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/AbstractJAXRSContinuationsTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/AbstractJAXRSContinuationsTest.java?rev=1410975&r1=1410974&r2=1410975&view=diff
==============================================================================
--- cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/AbstractJAXRSContinuationsTest.java (original)
+++ cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/AbstractJAXRSContinuationsTest.java Sun Nov 18 20:05:06 2012
@@ -49,6 +49,15 @@ public abstract class AbstractJAXRSConti
}
@Test
+ public void testImmediateResume() throws Exception {
+ WebClient wc = WebClient.create("http://localhost:" + getPort() + "/bookstore/books/resume");
+ WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(1000000L);
+ wc.accept("text/plain");
+ String str = wc.get(String.class);
+ assertEquals("immediateResume", str);
+ }
+
+ @Test
public void testTimeoutAndCancel() throws Exception {
WebClient wc = WebClient.create("http://localhost:" + getPort() + "/bookstore/books/cancel");
WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(1000000L);
@@ -66,6 +75,12 @@ public abstract class AbstractJAXRSConti
}
@Test
+ public void testContinuationWithTimeHandlerResumeOnly() throws Exception {
+
+ doTestContinuation("books/timeouthandlerresume");
+ }
+
+ @Test
public void testContinuation() throws Exception {
doTestContinuation("books");
@@ -82,24 +97,24 @@ public abstract class AbstractJAXRSConti
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10));
CountDownLatch startSignal = new CountDownLatch(1);
- CountDownLatch doneSignal = new CountDownLatch(5);
+ CountDownLatch doneSignal = new CountDownLatch(1);
executor.execute(new BookWorker("http://localhost:" + port + "/bookstore/" + pathSegment + "/1",
"1",
"CXF in Action1", startSignal, doneSignal));
- executor.execute(new BookWorker("http://localhost:" + port + "/bookstore/" + pathSegment + "/2",
- "2",
- "CXF in Action2", startSignal, doneSignal));
- executor.execute(new BookWorker("http://localhost:" + port + "/bookstore/" + pathSegment + "/3",
- "3",
- "CXF in Action3", startSignal, doneSignal));
- executor.execute(new BookWorker("http://localhost:" + port + "/bookstore/" + pathSegment + "/4",
- "4",
- "CXF in Action4", startSignal, doneSignal));
- executor.execute(new BookWorker("http://localhost:" + port + "/bookstore/" + pathSegment + "/5",
- "5",
- "CXF in Action5", startSignal, doneSignal));
-
+// executor.execute(new BookWorker("http://localhost:" + port + "/bookstore/" + pathSegment + "/2",
+// "2",
+// "CXF in Action2", startSignal, doneSignal));
+// executor.execute(new BookWorker("http://localhost:" + port + "/bookstore/" + pathSegment + "/3",
+// "3",
+// "CXF in Action3", startSignal, doneSignal));
+// executor.execute(new BookWorker("http://localhost:" + port + "/bookstore/" + pathSegment + "/4",
+// "4",
+// "CXF in Action4", startSignal, doneSignal));
+// executor.execute(new BookWorker("http://localhost:" + port + "/bookstore/" + pathSegment + "/5",
+// "5",
+// "CXF in Action5", startSignal, doneSignal));
+//
startSignal.countDown();
doneSignal.await(60, TimeUnit.SECONDS);
executor.shutdownNow();
Modified: cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java?rev=1410975&r1=1410974&r2=1410975&view=diff
==============================================================================
--- cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java (original)
+++ cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java Sun Nov 18 20:05:06 2012
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.Atomi
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.CompletionCallback;
import javax.ws.rs.container.TimeoutHandler;
@@ -54,6 +55,13 @@ public class BookContinuationStore {
}
@GET
+ @Path("/books/resume")
+ @Produces("text/plain")
+ public void getBookDescriptionImmediateResume(AsyncResponse async) {
+ async.resume("immediateResume");
+ }
+
+ @GET
@Path("/books/cancel")
public void getBookDescriptionWithCancel(@PathParam("id") String id, AsyncResponse async) {
async.setTimeout(2000, TimeUnit.MILLISECONDS);
@@ -63,8 +71,15 @@ public class BookContinuationStore {
@GET
@Path("/books/timeouthandler/{id}")
public void getBookDescriptionWithHandler(@PathParam("id") String id, AsyncResponse async) {
- async.setTimeout(2000, TimeUnit.MILLISECONDS);
- async.setTimeoutHandler(new TimeoutHandlerImpl(id));
+ async.setTimeout(1000, TimeUnit.MILLISECONDS);
+ async.setTimeoutHandler(new TimeoutHandlerImpl(id, false));
+ }
+
+ @GET
+ @Path("/books/timeouthandlerresume/{id}")
+ public void getBookDescriptionWithHandlerResumeOnly(@PathParam("id") String id, AsyncResponse async) {
+ async.setTimeout(1000, TimeUnit.MILLISECONDS);
+ async.setTimeoutHandler(new TimeoutHandlerImpl(id, true));
}
@GET
@@ -112,17 +127,18 @@ public class BookContinuationStore {
}
private class TimeoutHandlerImpl implements TimeoutHandler {
-
+ private boolean resumeOnly;
private String id;
private AtomicInteger timeoutExtendedCounter = new AtomicInteger();
- public TimeoutHandlerImpl(String id) {
+ public TimeoutHandlerImpl(String id, boolean resumeOnly) {
this.id = id;
+ this.resumeOnly = resumeOnly;
}
@Override
public void handleTimeout(AsyncResponse asyncResponse) {
- if (timeoutExtendedCounter.addAndGet(1) <= 2) {
+ if (!resumeOnly && timeoutExtendedCounter.addAndGet(1) <= 2) {
asyncResponse.setTimeout(1, TimeUnit.SECONDS);
} else {
asyncResponse.resume(books.get(id));