You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2012/10/01 12:42:00 UTC

svn commit: r1392272 - in /cxf/trunk: rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/

Author: sergeyb
Date: Mon Oct  1 10:41:59 2012
New Revision: 1392272

URL: http://svn.apache.org/viewvc?rev=1392272&view=rev
Log:
[CXF-4455] More AsyncResponse related code

Modified:
    cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java
    cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
    cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java
    cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSContinuationsTest.java

Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java?rev=1392272&r1=1392271&r2=1392272&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java Mon Oct  1 10:41:59 2012
@@ -82,15 +82,22 @@ public class JAXRSInvoker extends Abstra
             AsyncResponse asyncResp = exchange.get(AsyncResponse.class);
             if (asyncResp != null) {
                 AsyncResponseImpl asyncImpl = (AsyncResponseImpl)asyncResp;
-                Object asyncObj = asyncImpl.getResponseObject();
-                if (!(asyncObj instanceof Response)) {
+                
+                if (asyncImpl.isResumedByApplication()) {
+                    Object asyncObj = asyncImpl.getResponseObject();
                     if (asyncObj instanceof Throwable) {
-                        return handleFault(new Fault((Throwable)asyncObj), exchange.getInMessage(), null, null);    
-                    } else {
+                        return handleFault(new Fault((Throwable)asyncObj), 
+                                           exchange.getInMessage(), null, null);    
+                    } else if (!(asyncObj instanceof Response)) {
                         response = Response.ok().entity(asyncObj).build();
+                    } else {
+                        response = (Response)asyncObj;
                     }
+                } else if (asyncImpl.handleTimeout()) {
+                    return null;
                 } else {
-                    response = (Response)asyncObj;
+                    return handleFault(new Fault(new WebApplicationException(503)), 
+                                       exchange.getInMessage(), null, null);
                 }
             }
         }

Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java?rev=1392272&r1=1392271&r2=1392272&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java Mon Oct  1 10:41:59 2012
@@ -25,20 +25,24 @@ import javax.ws.rs.container.AsyncRespon
 import javax.ws.rs.container.TimeoutHandler;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
 
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.jaxrs.utils.HttpUtils;
 import org.apache.cxf.message.Message;
 
 
 public class AsyncResponseImpl implements AsyncResponse {
     
     private Continuation cont;
-    private Object responseObject;
-    private long timeout = 5000;
+    private long timeout = AsyncResponse.NO_TIMEOUT;
     private Message inMessage;
-    private boolean suspended;
     private boolean cancelled;
+    private boolean done;
+    private boolean newTimeoutRequested;
+    private boolean resumedByApplication;
+    private TimeoutHandler timeoutHandler;
     public AsyncResponseImpl(Message inMessage) {
         ContinuationProvider provider = 
             (ContinuationProvider)inMessage.get(ContinuationProvider.class.getName());
@@ -58,55 +62,68 @@ public class AsyncResponseImpl implement
         doResume(response);
     }
     
-    private void doResume(Object response) throws IllegalStateException {
-        responseObject = response;
+    private synchronized void doResume(Object response) throws IllegalStateException {
+        checkCancelled();
+        checkSuspended();
         inMessage.getExchange().put(AsyncResponse.class, this);
-        suspended = false;
+        cont.setObject(response);
+        resumedByApplication = true;
         cont.resume();
     }
     
     @Override
     public void cancel() {
-        cancel(-1);
+        doCancel(null);
     }
 
     @Override
-    //TODO: has to be long
     public void cancel(int retryAfter) {
-        cancelled = true;
-        doResume(Response.status(503).header(HttpHeaders.RETRY_AFTER, Integer.toString(retryAfter)).build());
+        doCancel(Integer.toString(retryAfter));
     }
 
     @Override
     public void cancel(Date retryAfter) {
-        cancel((int)(retryAfter.getTime() - new Date().getTime()));
+        doCancel(HttpUtils.getHttpDateFormat().format(retryAfter));
+    }
+    
+    private synchronized void doCancel(String retryAfterHeader) {
+        checkSuspended();
+        cancelled = true;
+        ResponseBuilder rb = Response.status(503);
+        if (retryAfterHeader != null) {
+            rb.header(HttpHeaders.RETRY_AFTER, retryAfterHeader);
+        }
+        doResume(rb.build());
     }
 
     @Override
-    public boolean isSuspended() {
-        return suspended;
+    public synchronized boolean isSuspended() {
+        return cont.isPending();
     }
 
     @Override
-    public boolean isCancelled() {
+    public synchronized boolean isCancelled() {
         return cancelled;
     }
 
     @Override
-    public boolean isDone() {
-        // TODO Auto-generated method stub
-        return false;
+    public synchronized boolean isDone() {
+        return done;
     }
 
     @Override
-    public void setTimeout(long time, TimeUnit unit) throws IllegalStateException {
-        // TODO Auto-generated method stub
+    public synchronized void setTimeout(long time, TimeUnit unit) throws IllegalStateException {
+        checkCancelled();
+        checkSuspended();
+        inMessage.getExchange().put(AsyncResponse.class, this);
+        timeout = unit.convert(time, TimeUnit.MILLISECONDS);
+        newTimeoutRequested = true;
+        cont.resume();
     }
 
     @Override
     public void setTimeoutHandler(TimeoutHandler handler) {
-        // TODO Auto-generated method stub
-        
+        timeoutHandler = handler;
     }
 
     @Override
@@ -133,13 +150,49 @@ public class AsyncResponseImpl implement
         return null;
     }
     
+    private void checkCancelled() {
+        if (cancelled) {
+            throw new IllegalStateException();
+        }
+    }
+    
+    private void checkSuspended() {
+        if (!cont.isPending()) {
+            throw new IllegalStateException();
+        }
+    }
+    
     // these methods are called by the runtime, not part of AsyncResponse    
-    public void suspend() {
-        cont.setObject(this);
+    public synchronized void suspend() {
+        checkCancelled();
         cont.suspend(timeout);
     }
     
-    public Object getResponseObject() {
-        return responseObject;
+    public synchronized Object getResponseObject() {
+        // it may have to be set to true only after a continuation-specific onComplete event
+        done = true;
+        return cont.getObject();
+    }
+    
+    public synchronized boolean isResumedByApplication() {
+        return resumedByApplication;
+    }
+    
+    public synchronized boolean handleTimeout() {
+        if (!resumedByApplication) {
+            if (newTimeoutRequested) {
+                newTimeoutRequested = false;
+                suspend();
+                return true;
+            } else if (timeoutHandler != null) {
+                suspend();
+                timeoutHandler.handleTimeout(this);
+                return true;
+            } else {
+                done = true;
+            }
+        }
+        return false;
+        
     }
 }

Modified: cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java?rev=1392272&r1=1392271&r2=1392272&view=diff
==============================================================================
--- cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java (original)
+++ cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java Mon Oct  1 10:41:59 2012
@@ -31,6 +31,7 @@ import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.TimeoutHandler;
 
 @Path("/bookstore")
 public class BookContinuationStore {
@@ -44,6 +45,20 @@ public class BookContinuationStore {
     }
     
     @GET
+    @Path("/books/defaulttimeout")
+    public void getBookDescriptionWithHandler(AsyncResponse async) {
+        async.setTimeout(2000, TimeUnit.MILLISECONDS);
+    }
+    
+    //Not currently invoked
+    @GET
+    @Path("/books/timeouthandler/{id}")
+    public void getBookDescriptionWithHandler(@PathParam("id") String id, AsyncResponse async) {
+        async.setTimeout(2000, TimeUnit.MILLISECONDS);
+        async.setTimeoutHandler(new TimeoutHandlerImpl(id));
+    }
+    
+    @GET
     @Path("/books/{id}")
     public void getBookDescription(@PathParam("id") String id, AsyncResponse async) {
         handleContinuationRequest(id, async);
@@ -87,6 +102,20 @@ public class BookContinuationStore {
         books.put("5", "CXF in Action5");
     }
      
+    private class TimeoutHandlerImpl implements TimeoutHandler {
+
+        private String id;
+        
+        public TimeoutHandlerImpl(String id) {
+            this.id = id;
+        }
+        
+        @Override
+        public void handleTimeout(AsyncResponse asyncResponse) {
+            asyncResponse.resume(books.get(id));
+        }
+        
+    }
 }
 
 

Modified: cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSContinuationsTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSContinuationsTest.java?rev=1392272&r1=1392271&r2=1392272&view=diff
==============================================================================
--- cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSContinuationsTest.java (original)
+++ cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSContinuationsTest.java Mon Oct  1 10:41:59 2012
@@ -24,8 +24,12 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import javax.ws.rs.core.Response;
+
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.jaxrs.client.WebClient;
 import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 
@@ -47,6 +51,14 @@ public class JAXRSContinuationsTest exte
     }
     
     @Test
+    public void testDefaultTimeout() throws Exception {
+        WebClient wc = WebClient.create("http://localhost:" + PORT + "/bookstore/books/defaulttimeout");
+        WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(1000000L);
+        Response r = wc.get();
+        assertEquals(503, r.getStatus());
+    }
+    
+    @Test
     public void testContinuation() throws Exception {
         
         doTestContinuation("books");
@@ -58,6 +70,12 @@ public class JAXRSContinuationsTest exte
         doTestContinuation("books/subresources");
     }
     
+    @Test
+    public void testContinuationWithTimeHandler() throws Exception {
+        
+        doTestContinuation("books/timeouthandler");
+    }
+    
     private void doTestContinuation(String pathSegment) throws Exception {
         ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
                                                              new ArrayBlockingQueue<Runnable>(10));
@@ -94,7 +112,7 @@ public class JAXRSContinuationsTest exte
             int result = httpclient.executeMethod(get);
             assertEquals(200, result);
             assertEquals("Book description for id " + id + " is wrong",
-                         expected, get.getResponseBodyAsString());
+                         expected, IOUtils.toString(get.getResponseBodyAsStream()));
         } finally {
             // Release current connection to the connection pool once you are done
             get.releaseConnection();