You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2013/11/15 14:23:58 UTC
svn commit: r1542259 - in /cxf/trunk:
core/src/main/java/org/apache/cxf/interceptor/
core/src/main/java/org/apache/cxf/phase/
rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/
systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/
Author: sergeyb
Date: Fri Nov 15 13:23:57 2013
New Revision: 1542259
URL: http://svn.apache.org/r1542259
Log:
[CXF-5373] Initial attempt at PhaseInterceptorChain.getCurrentMessage() working in the custom executor threads
Modified:
cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java
cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java
cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/AbstractJAXRSContinuationsTest.java
cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java
Modified: cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java?rev=1542259&r1=1542258&r2=1542259&view=diff
==============================================================================
--- cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java (original)
+++ cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java Fri Nov 15 13:23:57 2013
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.message.Exchange;
@@ -102,15 +103,20 @@ public class ServiceInvokerInterceptor e
// executor thread is done
final PhaseInterceptorChain chain = (PhaseInterceptorChain)message.getInterceptorChain();
+ final AtomicBoolean contextSwitched = new AtomicBoolean();
final FutureTask<Object> o = new FutureTask<Object>(invocation, null) {
@Override
protected void done() {
super.done();
+ if (contextSwitched.get()) {
+ PhaseInterceptorChain.setCurrentMessage(chain, null);
+ }
chain.releaseChain();
}
@Override
public void run() {
+ contextSwitched.getAndSet(PhaseInterceptorChain.setCurrentMessage(chain, message));
synchronized (chain) {
super.run();
}
@@ -132,6 +138,12 @@ public class ServiceInvokerInterceptor e
} else {
throw new Fault(e.getCause());
}
+ } catch (Exception e) {
+ if (e.getCause() instanceof RuntimeException) {
+ throw (RuntimeException)e.getCause();
+ } else {
+ throw new Fault(e.getCause());
+ }
}
}
Modified: cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java?rev=1542259&r1=1542258&r2=1542259&view=diff
==============================================================================
--- cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java (original)
+++ cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java Fri Nov 15 13:23:57 2013
@@ -37,6 +37,7 @@ import org.apache.cxf.continuations.Susp
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.interceptor.Interceptor;
import org.apache.cxf.interceptor.InterceptorChain;
+import org.apache.cxf.interceptor.ServiceInvokerInterceptor;
import org.apache.cxf.logging.FaultListener;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.FaultMode;
@@ -157,6 +158,20 @@ public class PhaseInterceptorChain imple
return CURRENT_MESSAGE.get();
}
+ public static boolean setCurrentMessage(PhaseInterceptorChain chain, Message m) {
+ if (getCurrentMessage() == m) {
+ return false;
+ }
+ if (chain.iterator.hasPrevious()) {
+ chain.iterator.previous();
+ if (chain.iterator.next() instanceof ServiceInvokerInterceptor) {
+ CURRENT_MESSAGE.set(m);
+ return true;
+ }
+ }
+ throw new IllegalStateException("Only ServiceInvokerInterceptor can update the current chain message");
+ }
+
public synchronized State getState() {
return state;
}
Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java?rev=1542259&r1=1542258&r2=1542259&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java Fri Nov 15 13:23:57 2013
@@ -103,7 +103,7 @@ public class JAXRSInvoker extends Abstra
}
return handleFault(ex, exchange.getInMessage());
} finally {
- boolean suspended = exchange.getInMessage().getInterceptorChain().getState() == State.SUSPENDED;
+ boolean suspended = isSuspended(exchange);
if (exchange.isOneWay() || suspended) {
ServerProviderFactory.getInstance(exchange.getInMessage()).clearThreadLocalProxies();
}
@@ -115,6 +115,10 @@ public class JAXRSInvoker extends Abstra
}
}
+ private boolean isSuspended(Exchange exchange) {
+ return exchange.getInMessage().getInterceptorChain().getState() == State.SUSPENDED;
+ }
+
private Object handleAsyncResponse(Exchange exchange, AsyncResponseImpl ar) {
Object asyncObj = ar.getResponseObject();
if (asyncObj instanceof Throwable) {
@@ -130,6 +134,10 @@ public class JAXRSInvoker extends Abstra
return handleFault(new Fault(t), exchange.getInMessage(), null, null);
} catch (Fault ex) {
ar.setUnmappedThrowable(ex.getCause());
+ if (isSuspended(exchange)) {
+ ar.reset();
+ exchange.getInMessage().getInterceptorChain().unpause();
+ }
return new MessageContentsList(Response.serverError().build());
}
}
Modified: cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/AbstractJAXRSContinuationsTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/AbstractJAXRSContinuationsTest.java?rev=1542259&r1=1542258&r2=1542259&view=diff
==============================================================================
--- cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/AbstractJAXRSContinuationsTest.java (original)
+++ cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/AbstractJAXRSContinuationsTest.java Fri Nov 15 13:23:57 2013
@@ -59,6 +59,13 @@ public abstract class AbstractJAXRSConti
}
@Test
+ public void testUnmappedAfterTimeout() throws Exception {
+ WebClient wc = WebClient.create("http://localhost:" + getPort() + getBaseAddress() + "/books/suspend/unmapped");
+ Response r = wc.get();
+ assertEquals(500, r.getStatus());
+ }
+
+ @Test
public void testImmediateResumeSubresource() throws Exception {
WebClient wc = WebClient.create("http://localhost:" + getPort()
+ getBaseAddress() + "/books/subresources/books/resume");
Modified: cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java?rev=1542259&r1=1542258&r2=1542259&view=diff
==============================================================================
--- cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java (original)
+++ cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java Fri Nov 15 13:23:57 2013
@@ -37,6 +37,8 @@ import javax.ws.rs.container.AsyncRespon
import javax.ws.rs.container.CompletionCallback;
import javax.ws.rs.container.TimeoutHandler;
+import org.apache.cxf.phase.PhaseInterceptorChain;
+
@Path("/bookstore")
public class BookContinuationStore {
@@ -65,6 +67,7 @@ public class BookContinuationStore {
@GET
@Path("/books/cancel")
public void getBookDescriptionWithCancel(@PathParam("id") String id, AsyncResponse async) {
+ PhaseInterceptorChain.getCurrentMessage().getClass();
async.setTimeout(2000, TimeUnit.MILLISECONDS);
async.setTimeoutHandler(new CancelTimeoutHandlerImpl());
}
@@ -118,6 +121,15 @@ public class BookContinuationStore {
resumeSuspendedNotFoundUnmapped(response);
}
+ @GET
+ @Path("books/suspend/unmapped")
+ @Produces("text/plain")
+ public void handleNotMappedAfterSuspend(AsyncResponse response) throws BookNotFoundFault {
+ response.setTimeout(2000, TimeUnit.MILLISECONDS);
+ response.setTimeoutHandler(new CancelTimeoutHandlerImpl());
+ throw new BookNotFoundFault("");
+ }
+
private void resumeSuspended(final String id, final AsyncResponse response) {