You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2013/11/15 14:23:58 UTC

svn commit: r1542259 - in /cxf/trunk: core/src/main/java/org/apache/cxf/interceptor/ core/src/main/java/org/apache/cxf/phase/ rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/

Author: sergeyb
Date: Fri Nov 15 13:23:57 2013
New Revision: 1542259

URL: http://svn.apache.org/r1542259
Log:
[CXF-5373] Initial attempt at PhaseInterceptorChain.getCurrentMessage() working in the custom executor threads

Modified:
    cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java
    cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
    cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java
    cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/AbstractJAXRSContinuationsTest.java
    cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java

Modified: cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java?rev=1542259&r1=1542258&r2=1542259&view=diff
==============================================================================
--- cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java (original)
+++ cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java Fri Nov 15 13:23:57 2013
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.message.Exchange;
@@ -102,15 +103,20 @@ public class ServiceInvokerInterceptor e
             // executor thread is done
             
             final PhaseInterceptorChain chain = (PhaseInterceptorChain)message.getInterceptorChain();
+            final AtomicBoolean contextSwitched = new AtomicBoolean(); 
             final FutureTask<Object> o = new FutureTask<Object>(invocation, null) {
                 @Override
                 protected void done() {
                     super.done();
+                    if (contextSwitched.get()) {
+                        PhaseInterceptorChain.setCurrentMessage(chain, null);
+                    }
                     chain.releaseChain();
                 }
                 
                 @Override
                 public void run() {
+                    contextSwitched.getAndSet(PhaseInterceptorChain.setCurrentMessage(chain, message));
                     synchronized (chain) {
                         super.run();
                     }
@@ -132,6 +138,12 @@ public class ServiceInvokerInterceptor e
                 } else {
                     throw new Fault(e.getCause());
                 }
+            } catch (Exception e) {
+                if (e.getCause() instanceof RuntimeException) {
+                    throw (RuntimeException)e.getCause();
+                } else {
+                    throw new Fault(e.getCause());
+                }
             }
             
         }

Modified: cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java?rev=1542259&r1=1542258&r2=1542259&view=diff
==============================================================================
--- cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java (original)
+++ cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java Fri Nov 15 13:23:57 2013
@@ -37,6 +37,7 @@ import org.apache.cxf.continuations.Susp
 import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.interceptor.Interceptor;
 import org.apache.cxf.interceptor.InterceptorChain;
+import org.apache.cxf.interceptor.ServiceInvokerInterceptor;
 import org.apache.cxf.logging.FaultListener;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.FaultMode;
@@ -157,6 +158,20 @@ public class PhaseInterceptorChain imple
         return CURRENT_MESSAGE.get();
     }
     
+    public static boolean setCurrentMessage(PhaseInterceptorChain chain, Message m) {
+        if (getCurrentMessage() == m) { 
+            return false;
+        }
+        if (chain.iterator.hasPrevious()) {
+            chain.iterator.previous();
+            if (chain.iterator.next() instanceof ServiceInvokerInterceptor) {
+                CURRENT_MESSAGE.set(m);
+                return true;
+            }
+        }
+        throw new IllegalStateException("Only ServiceInvokerInterceptor can update the current chain message");
+    }
+    
     public synchronized State getState() {
         return state;
     }

Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java?rev=1542259&r1=1542258&r2=1542259&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java Fri Nov 15 13:23:57 2013
@@ -103,7 +103,7 @@ public class JAXRSInvoker extends Abstra
             }
             return handleFault(ex, exchange.getInMessage());
         } finally {
-            boolean suspended = exchange.getInMessage().getInterceptorChain().getState() == State.SUSPENDED;
+            boolean suspended = isSuspended(exchange);
             if (exchange.isOneWay() || suspended) {
                 ServerProviderFactory.getInstance(exchange.getInMessage()).clearThreadLocalProxies();
             }
@@ -115,6 +115,10 @@ public class JAXRSInvoker extends Abstra
         }
     }
 
+    private boolean isSuspended(Exchange exchange) {
+        return exchange.getInMessage().getInterceptorChain().getState() == State.SUSPENDED;
+    }
+    
     private Object handleAsyncResponse(Exchange exchange, AsyncResponseImpl ar) {
         Object asyncObj = ar.getResponseObject();
         if (asyncObj instanceof Throwable) {
@@ -130,6 +134,10 @@ public class JAXRSInvoker extends Abstra
             return handleFault(new Fault(t), exchange.getInMessage(), null, null);
         } catch (Fault ex) {
             ar.setUnmappedThrowable(ex.getCause());
+            if (isSuspended(exchange)) {
+                ar.reset();
+                exchange.getInMessage().getInterceptorChain().unpause();
+            }
             return new MessageContentsList(Response.serverError().build());
         }
     }

Modified: cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/AbstractJAXRSContinuationsTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/AbstractJAXRSContinuationsTest.java?rev=1542259&r1=1542258&r2=1542259&view=diff
==============================================================================
--- cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/AbstractJAXRSContinuationsTest.java (original)
+++ cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/AbstractJAXRSContinuationsTest.java Fri Nov 15 13:23:57 2013
@@ -59,6 +59,13 @@ public abstract class AbstractJAXRSConti
     }
     
     @Test
+    public void testUnmappedAfterTimeout() throws Exception {
+        WebClient wc = WebClient.create("http://localhost:" + getPort() + getBaseAddress() + "/books/suspend/unmapped");
+        Response r = wc.get();
+        assertEquals(500, r.getStatus());
+    }
+    
+    @Test
     public void testImmediateResumeSubresource() throws Exception {
         WebClient wc = WebClient.create("http://localhost:" + getPort() 
                                         + getBaseAddress() + "/books/subresources/books/resume");

Modified: cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java?rev=1542259&r1=1542258&r2=1542259&view=diff
==============================================================================
--- cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java (original)
+++ cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java Fri Nov 15 13:23:57 2013
@@ -37,6 +37,8 @@ import javax.ws.rs.container.AsyncRespon
 import javax.ws.rs.container.CompletionCallback;
 import javax.ws.rs.container.TimeoutHandler;
 
+import org.apache.cxf.phase.PhaseInterceptorChain;
+
 @Path("/bookstore")
 public class BookContinuationStore {
 
@@ -65,6 +67,7 @@ public class BookContinuationStore {
     @GET
     @Path("/books/cancel")
     public void getBookDescriptionWithCancel(@PathParam("id") String id, AsyncResponse async) {
+        PhaseInterceptorChain.getCurrentMessage().getClass();
         async.setTimeout(2000, TimeUnit.MILLISECONDS);
         async.setTimeoutHandler(new CancelTimeoutHandlerImpl());
     }
@@ -118,6 +121,15 @@ public class BookContinuationStore {
         resumeSuspendedNotFoundUnmapped(response);
     }
     
+    @GET
+    @Path("books/suspend/unmapped")
+    @Produces("text/plain")
+    public void handleNotMappedAfterSuspend(AsyncResponse response) throws BookNotFoundFault {
+        response.setTimeout(2000, TimeUnit.MILLISECONDS);
+        response.setTimeoutHandler(new CancelTimeoutHandlerImpl());
+        throw new BookNotFoundFault("");
+    }
+    
     
     private void resumeSuspended(final String id, final AsyncResponse response) {