You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2012/10/01 12:42:00 UTC
svn commit: r1392272 - in /cxf/trunk:
rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/
rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/
systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/
Author: sergeyb
Date: Mon Oct 1 10:41:59 2012
New Revision: 1392272
URL: http://svn.apache.org/viewvc?rev=1392272&view=rev
Log:
[CXF-4455] More AsyncResponse related code
Modified:
cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java
cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java
cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSContinuationsTest.java
Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java?rev=1392272&r1=1392271&r2=1392272&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java Mon Oct 1 10:41:59 2012
@@ -82,15 +82,22 @@ public class JAXRSInvoker extends Abstra
AsyncResponse asyncResp = exchange.get(AsyncResponse.class);
if (asyncResp != null) {
AsyncResponseImpl asyncImpl = (AsyncResponseImpl)asyncResp;
- Object asyncObj = asyncImpl.getResponseObject();
- if (!(asyncObj instanceof Response)) {
+
+ if (asyncImpl.isResumedByApplication()) {
+ Object asyncObj = asyncImpl.getResponseObject();
if (asyncObj instanceof Throwable) {
- return handleFault(new Fault((Throwable)asyncObj), exchange.getInMessage(), null, null);
- } else {
+ return handleFault(new Fault((Throwable)asyncObj),
+ exchange.getInMessage(), null, null);
+ } else if (!(asyncObj instanceof Response)) {
response = Response.ok().entity(asyncObj).build();
+ } else {
+ response = (Response)asyncObj;
}
+ } else if (asyncImpl.handleTimeout()) {
+ return null;
} else {
- response = (Response)asyncObj;
+ return handleFault(new Fault(new WebApplicationException(503)),
+ exchange.getInMessage(), null, null);
}
}
}
Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java?rev=1392272&r1=1392271&r2=1392272&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java Mon Oct 1 10:41:59 2012
@@ -25,20 +25,24 @@ import javax.ws.rs.container.AsyncRespon
import javax.ws.rs.container.TimeoutHandler;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
import org.apache.cxf.continuations.Continuation;
import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.jaxrs.utils.HttpUtils;
import org.apache.cxf.message.Message;
public class AsyncResponseImpl implements AsyncResponse {
private Continuation cont;
- private Object responseObject;
- private long timeout = 5000;
+ private long timeout = AsyncResponse.NO_TIMEOUT;
private Message inMessage;
- private boolean suspended;
private boolean cancelled;
+ private boolean done;
+ private boolean newTimeoutRequested;
+ private boolean resumedByApplication;
+ private TimeoutHandler timeoutHandler;
public AsyncResponseImpl(Message inMessage) {
ContinuationProvider provider =
(ContinuationProvider)inMessage.get(ContinuationProvider.class.getName());
@@ -58,55 +62,68 @@ public class AsyncResponseImpl implement
doResume(response);
}
- private void doResume(Object response) throws IllegalStateException {
- responseObject = response;
+ private synchronized void doResume(Object response) throws IllegalStateException {
+ checkCancelled();
+ checkSuspended();
inMessage.getExchange().put(AsyncResponse.class, this);
- suspended = false;
+ cont.setObject(response);
+ resumedByApplication = true;
cont.resume();
}
@Override
public void cancel() {
- cancel(-1);
+ doCancel(null);
}
@Override
- //TODO: has to be long
public void cancel(int retryAfter) {
- cancelled = true;
- doResume(Response.status(503).header(HttpHeaders.RETRY_AFTER, Integer.toString(retryAfter)).build());
+ doCancel(Integer.toString(retryAfter));
}
@Override
public void cancel(Date retryAfter) {
- cancel((int)(retryAfter.getTime() - new Date().getTime()));
+ doCancel(HttpUtils.getHttpDateFormat().format(retryAfter));
+ }
+
+ private synchronized void doCancel(String retryAfterHeader) {
+ checkSuspended();
+ cancelled = true;
+ ResponseBuilder rb = Response.status(503);
+ if (retryAfterHeader != null) {
+ rb.header(HttpHeaders.RETRY_AFTER, retryAfterHeader);
+ }
+ doResume(rb.build());
}
@Override
- public boolean isSuspended() {
- return suspended;
+ public synchronized boolean isSuspended() {
+ return cont.isPending();
}
@Override
- public boolean isCancelled() {
+ public synchronized boolean isCancelled() {
return cancelled;
}
@Override
- public boolean isDone() {
- // TODO Auto-generated method stub
- return false;
+ public synchronized boolean isDone() {
+ return done;
}
@Override
- public void setTimeout(long time, TimeUnit unit) throws IllegalStateException {
- // TODO Auto-generated method stub
+ public synchronized void setTimeout(long time, TimeUnit unit) throws IllegalStateException {
+ checkCancelled();
+ checkSuspended();
+ inMessage.getExchange().put(AsyncResponse.class, this);
+ timeout = unit.convert(time, TimeUnit.MILLISECONDS);
+ newTimeoutRequested = true;
+ cont.resume();
}
@Override
public void setTimeoutHandler(TimeoutHandler handler) {
- // TODO Auto-generated method stub
-
+ timeoutHandler = handler;
}
@Override
@@ -133,13 +150,49 @@ public class AsyncResponseImpl implement
return null;
}
+ private void checkCancelled() {
+ if (cancelled) {
+ throw new IllegalStateException();
+ }
+ }
+
+ private void checkSuspended() {
+ if (!cont.isPending()) {
+ throw new IllegalStateException();
+ }
+ }
+
// these methods are called by the runtime, not part of AsyncResponse
- public void suspend() {
- cont.setObject(this);
+ public synchronized void suspend() {
+ checkCancelled();
cont.suspend(timeout);
}
- public Object getResponseObject() {
- return responseObject;
+ public synchronized Object getResponseObject() {
+ // it may have to be set to true only after a continuation-specific onComplete event
+ done = true;
+ return cont.getObject();
+ }
+
+ public synchronized boolean isResumedByApplication() {
+ return resumedByApplication;
+ }
+
+ public synchronized boolean handleTimeout() {
+ if (!resumedByApplication) {
+ if (newTimeoutRequested) {
+ newTimeoutRequested = false;
+ suspend();
+ return true;
+ } else if (timeoutHandler != null) {
+ suspend();
+ timeoutHandler.handleTimeout(this);
+ return true;
+ } else {
+ done = true;
+ }
+ }
+ return false;
+
}
}
Modified: cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java?rev=1392272&r1=1392271&r2=1392272&view=diff
==============================================================================
--- cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java (original)
+++ cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java Mon Oct 1 10:41:59 2012
@@ -31,6 +31,7 @@ import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.TimeoutHandler;
@Path("/bookstore")
public class BookContinuationStore {
@@ -44,6 +45,20 @@ public class BookContinuationStore {
}
@GET
+ @Path("/books/defaulttimeout")
+ public void getBookDescriptionWithHandler(AsyncResponse async) {
+ async.setTimeout(2000, TimeUnit.MILLISECONDS);
+ }
+
+ //Not currently invoked
+ @GET
+ @Path("/books/timeouthandler/{id}")
+ public void getBookDescriptionWithHandler(@PathParam("id") String id, AsyncResponse async) {
+ async.setTimeout(2000, TimeUnit.MILLISECONDS);
+ async.setTimeoutHandler(new TimeoutHandlerImpl(id));
+ }
+
+ @GET
@Path("/books/{id}")
public void getBookDescription(@PathParam("id") String id, AsyncResponse async) {
handleContinuationRequest(id, async);
@@ -87,6 +102,20 @@ public class BookContinuationStore {
books.put("5", "CXF in Action5");
}
+ private class TimeoutHandlerImpl implements TimeoutHandler {
+
+ private String id;
+
+ public TimeoutHandlerImpl(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public void handleTimeout(AsyncResponse asyncResponse) {
+ asyncResponse.resume(books.get(id));
+ }
+
+ }
}
Modified: cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSContinuationsTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSContinuationsTest.java?rev=1392272&r1=1392271&r2=1392272&view=diff
==============================================================================
--- cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSContinuationsTest.java (original)
+++ cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSContinuationsTest.java Mon Oct 1 10:41:59 2012
@@ -24,8 +24,12 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import javax.ws.rs.core.Response;
+
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
@@ -47,6 +51,14 @@ public class JAXRSContinuationsTest exte
}
@Test
+ public void testDefaultTimeout() throws Exception {
+ WebClient wc = WebClient.create("http://localhost:" + PORT + "/bookstore/books/defaulttimeout");
+ WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(1000000L);
+ Response r = wc.get();
+ assertEquals(503, r.getStatus());
+ }
+
+ @Test
public void testContinuation() throws Exception {
doTestContinuation("books");
@@ -58,6 +70,12 @@ public class JAXRSContinuationsTest exte
doTestContinuation("books/subresources");
}
+ @Test
+ public void testContinuationWithTimeHandler() throws Exception {
+
+ doTestContinuation("books/timeouthandler");
+ }
+
private void doTestContinuation(String pathSegment) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10));
@@ -94,7 +112,7 @@ public class JAXRSContinuationsTest exte
int result = httpclient.executeMethod(get);
assertEquals(200, result);
assertEquals("Book description for id " + id + " is wrong",
- expected, get.getResponseBodyAsString());
+ expected, IOUtils.toString(get.getResponseBodyAsStream()));
} finally {
// Release current connection to the connection pool once you are done
get.releaseConnection();