You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ni...@apache.org on 2010/09/09 08:07:45 UTC

svn commit: r995320 - in /cxf/trunk: api/src/main/java/org/apache/cxf/interceptor/ api/src/main/java/org/apache/cxf/phase/ api/src/test/java/org/apache/cxf/phase/ rt/core/src/main/java/org/apache/cxf/transport/ rt/transports/http-jetty/src/main/java/or...

Author: ningjiang
Date: Thu Sep  9 06:07:44 2010
New Revision: 995320

URL: http://svn.apache.org/viewvc?rev=995320&view=rev
Log:
CXF-2982 Don't throw the SuspendedInvocationException when call the suspend() method of CXF continuation

Modified:
    cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/InterceptorChain.java
    cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
    cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
    cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java
    cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java
    cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java
    cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/HWSoapMessageDocProvider.java

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/InterceptorChain.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/InterceptorChain.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/InterceptorChain.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/interceptor/InterceptorChain.java Thu Sep  9 06:07:44 2010
@@ -37,6 +37,7 @@ public interface InterceptorChain extend
     
     enum State {
         PAUSED,
+        SUSPENDED,
         EXECUTING,
         COMPLETE,
         ABORTED,
@@ -68,6 +69,8 @@ public interface InterceptorChain extend
 
     void pause();
     
+    void suspend();
+    
     void resume();
     
     void reset();

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java Thu Sep  9 06:07:44 2010
@@ -155,7 +155,7 @@ public class PhaseInterceptorChain imple
     }
     
     // this method should really be on the InterceptorChain interface
-    public State getState() {
+    public synchronized State getState() {
         return state;
     }
     
@@ -212,9 +212,13 @@ public class PhaseInterceptorChain imple
     public synchronized void pause() {
         state = State.PAUSED;
     }
+    
+    public synchronized void suspend() {
+        state = State.SUSPENDED;
+    }
 
     public synchronized void resume() {
-        if (state == State.PAUSED) {
+        if (state == State.PAUSED || state == State.SUSPENDED) {
             state = State.EXECUTING;
             doIntercept(pausedMessage);
         }
@@ -242,6 +246,11 @@ public class PhaseInterceptorChain imple
                     }
                     //System.out.println("-----------" + currentInterceptor);
                     currentInterceptor.handleMessage(message);
+                    if (state == State.SUSPENDED) {
+                         // throw the exception to make sure thread exit without interrupt
+                        throw new SuspendedInvocationException();
+                    }
+                    
                 } catch (SuspendedInvocationException ex) {
                     // we need to resume from the same interceptor the exception got originated from
                     if (iterator.hasPrevious()) {

Modified: cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java (original)
+++ cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java Thu Sep  9 06:07:44 2010
@@ -35,6 +35,7 @@ import org.apache.cxf.message.FaultMode;
 import org.apache.cxf.message.Message;
 import org.easymock.classextension.EasyMock;
 import org.easymock.classextension.IMocksControl;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -73,6 +74,7 @@ public class PhaseInterceptorChainTest e
     @Test
     public void testState() throws Exception {
         AbstractPhaseInterceptor p = setUpPhaseInterceptor("phase1", "p1");
+       
         control.replay();
         chain.add(p);
         
@@ -103,6 +105,7 @@ public class PhaseInterceptorChainTest e
         
         chain.add(p1);
         chain.add(p2);
+        
         try {
             chain.doIntercept(message);
             fail("Suspended invocation swallowed");
@@ -529,7 +532,7 @@ public class PhaseInterceptorChainTest e
         }
         
         public void handleMessage(Message m) {
-            throw new SuspendedInvocationException(new Throwable());
+            m.getInterceptorChain().suspend();
         }
     }
 

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java Thu Sep  9 06:07:44 2010
@@ -64,8 +64,10 @@ public class ChainInitiationObserver imp
             
             if (m.getInterceptorChain() instanceof PhaseInterceptorChain) {
                 phaseChain = (PhaseInterceptorChain)m.getInterceptorChain();
+                // To make sure the phase chain is run by one thread once
                 synchronized (phaseChain) {
-                    if (phaseChain.getState() == InterceptorChain.State.PAUSED) {
+                    if (phaseChain.getState() == InterceptorChain.State.PAUSED 
+                        || phaseChain.getState() == InterceptorChain.State.SUSPENDED) {
                         phaseChain.resume();
                         return;
                     }
@@ -110,6 +112,7 @@ public class ChainInitiationObserver imp
             addToChain(phaseChain, message);
             
             phaseChain.doIntercept(message);
+            
         } finally {
             BusFactory.setThreadDefaultBus(origBus);
         }

Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java Thu Sep  9 06:07:44 2010
@@ -44,11 +44,16 @@ public class JettyContinuationProvider i
         return getContinuation(true);
     }    
     public JettyContinuationWrapper getContinuation(boolean create) {
-        if (inMessage.getExchange().isOneWay()) {
-            return null;
+        Message m = inMessage;
+        // Get the real message which is used in the interceptor chain
+        if (m != null && m.getExchange() != null && m.getExchange().getInMessage() != null) {
+            m = m.getExchange().getInMessage();
         }
+        if (m.getExchange().isOneWay()) {
+            return null;
+        }        
         if (wrapper == null && create) {
-            wrapper = new JettyContinuationWrapper(request, response, inMessage);
+            wrapper = new JettyContinuationWrapper(request, response, m);
         }
         return wrapper;
     }

Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java Thu Sep  9 06:07:44 2010
@@ -32,7 +32,7 @@ import org.eclipse.jetty.server.Request;
 public class JettyContinuationWrapper implements Continuation, ContinuationListener {
     boolean isNew;
     boolean isResumed;
-    boolean isPending = true;
+    boolean isPending;
     Object obj;
     
     private Message message;
@@ -80,13 +80,21 @@ public class JettyContinuationWrapper im
     }
 
     public void reset() {
+        context.complete();
+        obj = null;
     }
 
 
     public boolean suspend(long timeout) {
+        if (isPending) {
+            return false;
+        }
         context.setTimeout(timeout);
         isNew = false;
-        throw new org.apache.cxf.continuations.SuspendedInvocationException();
+        // Need to get the right message which is handled in the interceptor chain
+        message.getExchange().getInMessage().getInterceptorChain().suspend();
+        isPending = true;
+        return true;
     }
     
     protected Message getMessage() {
@@ -104,6 +112,7 @@ public class JettyContinuationWrapper im
     }
 
     public void onTimeout(org.eclipse.jetty.continuation.Continuation continuation) {
+        isPending = false;
         context.dispatch();
     }
     

Modified: cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java (original)
+++ cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java Thu Sep  9 06:07:44 2010
@@ -60,7 +60,7 @@ public class Servlet3ContinuationProvide
         AsyncContext context;
         boolean isNew;
         boolean isResumed;
-        boolean isPending = true;
+        boolean isPending;
         Object obj;
         
         public Servlet3Continuation() {
@@ -76,9 +76,16 @@ public class Servlet3ContinuationProvide
         }
         
         public boolean suspend(long timeout) {
+            if (isPending) {
+                return false;
+            }
             context.setTimeout(timeout);
             isNew = false;
-            throw new org.apache.cxf.continuations.SuspendedInvocationException();
+            // Need to get the right message which is handled in the interceptor chain
+            inMessage.getExchange().getInMessage().getInterceptorChain().suspend();
+                
+            isPending = true;
+            return true;
         }
         public void redispatch() {
             context.dispatch();
@@ -89,6 +96,8 @@ public class Servlet3ContinuationProvide
         }
 
         public void reset() {
+            context.complete();
+            obj = null;
         }
 
         public boolean isNew() {
@@ -121,6 +130,7 @@ public class Servlet3ContinuationProvide
         public void onStartAsync(AsyncEvent event) throws IOException {
         }
         public void onTimeout(AsyncEvent event) throws IOException {
+            isPending = false;
             redispatch();
         }
         

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java Thu Sep  9 06:07:44 2010
@@ -26,7 +26,6 @@ import java.util.TimerTask;
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
 import org.apache.cxf.continuations.Continuation;
-import org.apache.cxf.continuations.SuspendedInvocationException;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.jms.JMSConfiguration;
@@ -82,6 +81,7 @@ public class JMSContinuation implements 
         isNew = true;
         isPending = false;
         isResumed = false;
+        userObject = null;
     }
 
     public void resume() {
@@ -94,9 +94,7 @@ public class JMSContinuation implements 
     }
     
     protected void doResume() {
-        
         updateContinuations(true);
-        
         BusFactory.setThreadDefaultBus(bus);
         try {
             incomingObserver.onMessage(inMessage);
@@ -115,7 +113,8 @@ public class JMSContinuation implements 
         if (isPending) {
             return false;
         }
-        
+        // Need to get the right message which is handled in the interceptor chain
+        inMessage.getExchange().getInMessage().getInterceptorChain().suspend();
         updateContinuations(false);
                 
         isNew = false;
@@ -125,8 +124,7 @@ public class JMSContinuation implements 
         if (timeout > 0) {
             createTimerTask(timeout);
         }
-        
-        throw new SuspendedInvocationException();
+        return true;
     }
 
     protected void createTimerTask(long timeout) {

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java Thu Sep  9 06:07:44 2010
@@ -54,6 +54,7 @@ public class JMSContinuationProvider imp
     
     public Continuation getContinuation() {
         Message m = inMessage;
+        // Get the real message which is used in the interceptor chain
         if (m != null && m.getExchange() != null && m.getExchange().getInMessage() != null) {
             m = m.getExchange().getInMessage();
         }

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java Thu Sep  9 06:07:44 2010
@@ -24,7 +24,10 @@ import java.util.List;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
-import org.apache.cxf.continuations.SuspendedInvocationException;
+
+import org.apache.cxf.interceptor.InterceptorChain;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.transport.MessageObserver;
@@ -48,6 +51,10 @@ public class JMSContinuationTest extends
     @Before
     public void setUp() {
         m = new MessageImpl();
+        Exchange exchange = new ExchangeImpl();
+        m.setExchange(exchange);
+        m.setInterceptorChain(EasyMock.createMock(InterceptorChain.class));
+        exchange.setInMessage(m);
         continuations = new LinkedList<JMSContinuation>();
         b = BusFactory.getDefaultBus();
         observer = EasyMock.createMock(MessageObserver.class);
@@ -66,12 +73,9 @@ public class JMSContinuationTest extends
     public void testSuspendResume() {
         TestJMSContinuationWrapper cw = 
             new TestJMSContinuationWrapper(b, m, observer, continuations, null, new JMSConfiguration());
-        try {
-            cw.suspend(5000);
-            fail("SuspendInvocation exception expected");
-        } catch (SuspendedInvocationException ex) {
-            // ignore
-        }
+        
+        cw.suspend(5000);
+          
         assertFalse(cw.isNew());
         assertTrue(cw.isPending());
         assertFalse(cw.isResumed());
@@ -122,12 +126,8 @@ public class JMSContinuationTest extends
     
     private void suspendResumeCheckStartAndStop(JMSContinuation cw, JMSConfiguration config,
                                             DefaultMessageListenerContainerStub springContainer) {
-        try {
-            cw.suspend(5000);
-            fail("SuspendInvocation exception expected");
-        } catch (SuspendedInvocationException ex) {
-            // ignore
-        }
+        cw.suspend(5000);
+            
         assertEquals(continuations.size(), 1);
         assertSame(continuations.get(0), cw);
         assertTrue(springContainer.isStop());

Modified: cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/HWSoapMessageDocProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/HWSoapMessageDocProvider.java?rev=995320&r1=995319&r2=995320&view=diff
==============================================================================
--- cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/HWSoapMessageDocProvider.java (original)
+++ cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/HWSoapMessageDocProvider.java Thu Sep  9 06:07:44 2010
@@ -38,7 +38,6 @@ import org.w3c.dom.Node;
 
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.continuations.ContinuationProvider;
-import org.apache.cxf.continuations.SuspendedInvocationException;
 import org.apache.cxf.helpers.DOMUtils;
 
 
@@ -78,35 +77,25 @@ public class HWSoapMessageDocProvider im
             ContinuationProvider contProvider = 
                 (ContinuationProvider) messageContext.get(ContinuationProvider.class.getName());
             final Continuation continuation = contProvider.getContinuation();
-            synchronized (continuation) {
-                if (continuation.isNew()) {
-
-                    new Thread(new Runnable() {
-
-                        public void run() {
-                            try {
-                                synchronized (continuation) {
-                                    continuation.resume();
-                                }
-                            } catch (Exception e) {
-                                e.printStackTrace();
-                            }
+            
+            if (continuation.isNew()) {
+                continuation.suspend(5000);
+                new Thread(new Runnable() {
+                    public void run() {
+                        try {
+                            continuation.resume();
+                        } catch (Exception e) {
+                            e.printStackTrace();
                         }
-
-                    }).start();
-
-                    continuation.suspend(5000);
-                    throw new RuntimeException("The continuation provider doesn't "
-                            + "support asynchronous continuations");
-                    
-                } else if (!continuation.isResumed()) {
-                    throw new RuntimeException("time out");
-                } else {
-                    return resumeMessage(request);
-                }
+                    }
+                }).start();
+                return null;
+            } else if (!continuation.isResumed()) {
+                continuation.reset();
+                throw new RuntimeException("time out");
+            } else {
+                return resumeMessage(request);
             }
-        } catch (SuspendedInvocationException e) {
-            throw e;
         } catch (SOAPFaultException e) {
             throw e;
         }