You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ni...@apache.org on 2010/09/09 08:07:45 UTC
svn commit: r995320 - in /cxf/trunk:
api/src/main/java/org/apache/cxf/interceptor/
api/src/main/java/org/apache/cxf/phase/
api/src/test/java/org/apache/cxf/phase/
rt/core/src/main/java/org/apache/cxf/transport/
rt/transports/http-jetty/src/main/java/or...
Author: ningjiang
Date: Thu Sep 9 06:07:44 2010
New Revision: 995320
URL: http://svn.apache.org/viewvc?rev=995320&view=rev
Log:
CXF-2982 Don't throw the SuspendedInvocationException when call the suspend() method of CXF continuation
Modified:
cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/InterceptorChain.java
cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java
cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java
cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java
cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/HWSoapMessageDocProvider.java
Modified: cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/InterceptorChain.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/InterceptorChain.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/InterceptorChain.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/InterceptorChain.java Thu Sep 9 06:07:44 2010
@@ -37,6 +37,7 @@ public interface InterceptorChain extend
enum State {
PAUSED,
+ SUSPENDED,
EXECUTING,
COMPLETE,
ABORTED,
@@ -68,6 +69,8 @@ public interface InterceptorChain extend
void pause();
+ void suspend();
+
void resume();
void reset();
Modified: cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java Thu Sep 9 06:07:44 2010
@@ -155,7 +155,7 @@ public class PhaseInterceptorChain imple
}
// this method should really be on the InterceptorChain interface
- public State getState() {
+ public synchronized State getState() {
return state;
}
@@ -212,9 +212,13 @@ public class PhaseInterceptorChain imple
public synchronized void pause() {
state = State.PAUSED;
}
+
+ public synchronized void suspend() {
+ state = State.SUSPENDED;
+ }
public synchronized void resume() {
- if (state == State.PAUSED) {
+ if (state == State.PAUSED || state == State.SUSPENDED) {
state = State.EXECUTING;
doIntercept(pausedMessage);
}
@@ -242,6 +246,11 @@ public class PhaseInterceptorChain imple
}
//System.out.println("-----------" + currentInterceptor);
currentInterceptor.handleMessage(message);
+ if (state == State.SUSPENDED) {
+ // throw the exception to make sure thread exit without interrupt
+ throw new SuspendedInvocationException();
+ }
+
} catch (SuspendedInvocationException ex) {
// we need to resume from the same interceptor the exception got originated from
if (iterator.hasPrevious()) {
Modified: cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java (original)
+++ cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java Thu Sep 9 06:07:44 2010
@@ -35,6 +35,7 @@ import org.apache.cxf.message.FaultMode;
import org.apache.cxf.message.Message;
import org.easymock.classextension.EasyMock;
import org.easymock.classextension.IMocksControl;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -73,6 +74,7 @@ public class PhaseInterceptorChainTest e
@Test
public void testState() throws Exception {
AbstractPhaseInterceptor p = setUpPhaseInterceptor("phase1", "p1");
+
control.replay();
chain.add(p);
@@ -103,6 +105,7 @@ public class PhaseInterceptorChainTest e
chain.add(p1);
chain.add(p2);
+
try {
chain.doIntercept(message);
fail("Suspended invocation swallowed");
@@ -529,7 +532,7 @@ public class PhaseInterceptorChainTest e
}
public void handleMessage(Message m) {
- throw new SuspendedInvocationException(new Throwable());
+ m.getInterceptorChain().suspend();
}
}
Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java Thu Sep 9 06:07:44 2010
@@ -64,8 +64,10 @@ public class ChainInitiationObserver imp
if (m.getInterceptorChain() instanceof PhaseInterceptorChain) {
phaseChain = (PhaseInterceptorChain)m.getInterceptorChain();
+ // To make sure the phase chain is run by one thread once
synchronized (phaseChain) {
- if (phaseChain.getState() == InterceptorChain.State.PAUSED) {
+ if (phaseChain.getState() == InterceptorChain.State.PAUSED
+ || phaseChain.getState() == InterceptorChain.State.SUSPENDED) {
phaseChain.resume();
return;
}
@@ -110,6 +112,7 @@ public class ChainInitiationObserver imp
addToChain(phaseChain, message);
phaseChain.doIntercept(message);
+
} finally {
BusFactory.setThreadDefaultBus(origBus);
}
Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java Thu Sep 9 06:07:44 2010
@@ -44,11 +44,16 @@ public class JettyContinuationProvider i
return getContinuation(true);
}
public JettyContinuationWrapper getContinuation(boolean create) {
- if (inMessage.getExchange().isOneWay()) {
- return null;
+ Message m = inMessage;
+ // Get the real message which is used in the interceptor chain
+ if (m != null && m.getExchange() != null && m.getExchange().getInMessage() != null) {
+ m = m.getExchange().getInMessage();
}
+ if (m.getExchange().isOneWay()) {
+ return null;
+ }
if (wrapper == null && create) {
- wrapper = new JettyContinuationWrapper(request, response, inMessage);
+ wrapper = new JettyContinuationWrapper(request, response, m);
}
return wrapper;
}
Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java Thu Sep 9 06:07:44 2010
@@ -32,7 +32,7 @@ import org.eclipse.jetty.server.Request;
public class JettyContinuationWrapper implements Continuation, ContinuationListener {
boolean isNew;
boolean isResumed;
- boolean isPending = true;
+ boolean isPending;
Object obj;
private Message message;
@@ -80,13 +80,21 @@ public class JettyContinuationWrapper im
}
public void reset() {
+ context.complete();
+ obj = null;
}
public boolean suspend(long timeout) {
+ if (isPending) {
+ return false;
+ }
context.setTimeout(timeout);
isNew = false;
- throw new org.apache.cxf.continuations.SuspendedInvocationException();
+ // Need to get the right message which is handled in the interceptor chain
+ message.getExchange().getInMessage().getInterceptorChain().suspend();
+ isPending = true;
+ return true;
}
protected Message getMessage() {
@@ -104,6 +112,7 @@ public class JettyContinuationWrapper im
}
public void onTimeout(org.eclipse.jetty.continuation.Continuation continuation) {
+ isPending = false;
context.dispatch();
}
Modified: cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java (original)
+++ cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java Thu Sep 9 06:07:44 2010
@@ -60,7 +60,7 @@ public class Servlet3ContinuationProvide
AsyncContext context;
boolean isNew;
boolean isResumed;
- boolean isPending = true;
+ boolean isPending;
Object obj;
public Servlet3Continuation() {
@@ -76,9 +76,16 @@ public class Servlet3ContinuationProvide
}
public boolean suspend(long timeout) {
+ if (isPending) {
+ return false;
+ }
context.setTimeout(timeout);
isNew = false;
- throw new org.apache.cxf.continuations.SuspendedInvocationException();
+ // Need to get the right message which is handled in the interceptor chain
+ inMessage.getExchange().getInMessage().getInterceptorChain().suspend();
+
+ isPending = true;
+ return true;
}
public void redispatch() {
context.dispatch();
@@ -89,6 +96,8 @@ public class Servlet3ContinuationProvide
}
public void reset() {
+ context.complete();
+ obj = null;
}
public boolean isNew() {
@@ -121,6 +130,7 @@ public class Servlet3ContinuationProvide
public void onStartAsync(AsyncEvent event) throws IOException {
}
public void onTimeout(AsyncEvent event) throws IOException {
+ isPending = false;
redispatch();
}
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java Thu Sep 9 06:07:44 2010
@@ -26,7 +26,6 @@ import java.util.TimerTask;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
import org.apache.cxf.continuations.Continuation;
-import org.apache.cxf.continuations.SuspendedInvocationException;
import org.apache.cxf.message.Message;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.transport.jms.JMSConfiguration;
@@ -82,6 +81,7 @@ public class JMSContinuation implements
isNew = true;
isPending = false;
isResumed = false;
+ userObject = null;
}
public void resume() {
@@ -94,9 +94,7 @@ public class JMSContinuation implements
}
protected void doResume() {
-
updateContinuations(true);
-
BusFactory.setThreadDefaultBus(bus);
try {
incomingObserver.onMessage(inMessage);
@@ -115,7 +113,8 @@ public class JMSContinuation implements
if (isPending) {
return false;
}
-
+ // Need to get the right message which is handled in the interceptor chain
+ inMessage.getExchange().getInMessage().getInterceptorChain().suspend();
updateContinuations(false);
isNew = false;
@@ -125,8 +124,7 @@ public class JMSContinuation implements
if (timeout > 0) {
createTimerTask(timeout);
}
-
- throw new SuspendedInvocationException();
+ return true;
}
protected void createTimerTask(long timeout) {
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java Thu Sep 9 06:07:44 2010
@@ -54,6 +54,7 @@ public class JMSContinuationProvider imp
public Continuation getContinuation() {
Message m = inMessage;
+ // Get the real message which is used in the interceptor chain
if (m != null && m.getExchange() != null && m.getExchange().getInMessage() != null) {
m = m.getExchange().getInMessage();
}
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java Thu Sep 9 06:07:44 2010
@@ -24,7 +24,10 @@ import java.util.List;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
-import org.apache.cxf.continuations.SuspendedInvocationException;
+
+import org.apache.cxf.interceptor.InterceptorChain;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.transport.MessageObserver;
@@ -48,6 +51,10 @@ public class JMSContinuationTest extends
@Before
public void setUp() {
m = new MessageImpl();
+ Exchange exchange = new ExchangeImpl();
+ m.setExchange(exchange);
+ m.setInterceptorChain(EasyMock.createMock(InterceptorChain.class));
+ exchange.setInMessage(m);
continuations = new LinkedList<JMSContinuation>();
b = BusFactory.getDefaultBus();
observer = EasyMock.createMock(MessageObserver.class);
@@ -66,12 +73,9 @@ public class JMSContinuationTest extends
public void testSuspendResume() {
TestJMSContinuationWrapper cw =
new TestJMSContinuationWrapper(b, m, observer, continuations, null, new JMSConfiguration());
- try {
- cw.suspend(5000);
- fail("SuspendInvocation exception expected");
- } catch (SuspendedInvocationException ex) {
- // ignore
- }
+
+ cw.suspend(5000);
+
assertFalse(cw.isNew());
assertTrue(cw.isPending());
assertFalse(cw.isResumed());
@@ -122,12 +126,8 @@ public class JMSContinuationTest extends
private void suspendResumeCheckStartAndStop(JMSContinuation cw, JMSConfiguration config,
DefaultMessageListenerContainerStub springContainer) {
- try {
- cw.suspend(5000);
- fail("SuspendInvocation exception expected");
- } catch (SuspendedInvocationException ex) {
- // ignore
- }
+ cw.suspend(5000);
+
assertEquals(continuations.size(), 1);
assertSame(continuations.get(0), cw);
assertTrue(springContainer.isStop());
Modified: cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/HWSoapMessageDocProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/HWSoapMessageDocProvider.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/HWSoapMessageDocProvider.java (original)
+++ cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/HWSoapMessageDocProvider.java Thu Sep 9 06:07:44 2010
@@ -38,7 +38,6 @@ import org.w3c.dom.Node;
import org.apache.cxf.continuations.Continuation;
import org.apache.cxf.continuations.ContinuationProvider;
-import org.apache.cxf.continuations.SuspendedInvocationException;
import org.apache.cxf.helpers.DOMUtils;
@@ -78,35 +77,25 @@ public class HWSoapMessageDocProvider im
ContinuationProvider contProvider =
(ContinuationProvider) messageContext.get(ContinuationProvider.class.getName());
final Continuation continuation = contProvider.getContinuation();
- synchronized (continuation) {
- if (continuation.isNew()) {
-
- new Thread(new Runnable() {
-
- public void run() {
- try {
- synchronized (continuation) {
- continuation.resume();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
+
+ if (continuation.isNew()) {
+ continuation.suspend(5000);
+ new Thread(new Runnable() {
+ public void run() {
+ try {
+ continuation.resume();
+ } catch (Exception e) {
+ e.printStackTrace();
}
-
- }).start();
-
- continuation.suspend(5000);
- throw new RuntimeException("The continuation provider doesn't "
- + "support asynchronous continuations");
-
- } else if (!continuation.isResumed()) {
- throw new RuntimeException("time out");
- } else {
- return resumeMessage(request);
- }
+ }
+ }).start();
+ return null;
+ } else if (!continuation.isResumed()) {
+ continuation.reset();
+ throw new RuntimeException("time out");
+ } else {
+ return resumeMessage(request);
}
- } catch (SuspendedInvocationException e) {
- throw e;
} catch (SOAPFaultException e) {
throw e;
}