You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2009/12/15 20:32:41 UTC

svn commit: r890957 - in /cxf/trunk: api/src/main/java/org/apache/cxf/phase/ rt/core/src/main/java/org/apache/cxf/interceptor/ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/

Author: dkulp
Date: Tue Dec 15 19:32:40 2009
New Revision: 890957

URL: http://svn.apache.org/viewvc?rev=890957&view=rev
Log:
[CXF-2550] If JMS transactions aren't being handled by a spring
TransactionProxy, we need to make sure the exceptions are propogated
back to the container

Modified:
    cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java
    cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java
    cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/Server.java

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java?rev=890957&r1=890956&r2=890957&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java Tue Dec 15 19:32:40 2009
@@ -302,7 +302,7 @@
                         }                    
                         unwind(message);
                         
-                        if (faultObserver != null) {
+                        if (faultObserver != null && !message.getExchange().isOneWay()) {
                             faultObserver.onMessage(message);
                         }
                     }

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java?rev=890957&r1=890956&r2=890957&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java Tue Dec 15 19:32:40 2009
@@ -104,24 +104,22 @@
             };
             synchronized (o) {
                 executor.execute(o);
-                if (!exchange.isOneWay()) {
-                    if (!o.isDone()) {
-                        try {
-                            o.wait();
-                        } catch (InterruptedException e) {
-                            //IGNORE
-                        }
-                    }
+                if (!o.isDone()) {
                     try {
-                        o.get();
+                        o.wait();
                     } catch (InterruptedException e) {
-                        throw new Fault(e);
-                    } catch (ExecutionException e) {
-                        if (e.getCause() instanceof RuntimeException) {
-                            throw (RuntimeException)e.getCause();
-                        } else {
-                            throw new Fault(e.getCause());
-                        }
+                        //IGNORE
+                    }
+                }
+                try {
+                    o.get();
+                } catch (InterruptedException e) {
+                    throw new Fault(e);
+                } catch (ExecutionException e) {
+                    if (e.getCause() instanceof RuntimeException) {
+                        throw (RuntimeException)e.getCause();
+                    } else {
+                        throw new Fault(e.getCause());
                     }
                 }
             }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=890957&r1=890956&r2=890957&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Tue Dec 15 19:32:40 2009
@@ -58,15 +58,20 @@
 import org.apache.cxf.transport.jms.continuations.JMSContinuationProvider;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.wsdl.EndpointReferenceUtils;
+import org.springframework.jms.connection.JmsResourceHolder;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.MessageCreator;
 import org.springframework.jms.core.SessionCallback;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.SessionAwareMessageListener;
 import org.springframework.jms.support.JmsUtils;
 import org.springframework.jms.support.destination.DestinationResolver;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionSynchronizationManager;
 
-public class JMSDestination extends AbstractMultiplexDestination implements MessageListener,
-    JMSExchangeSender {
+public class JMSDestination extends AbstractMultiplexDestination 
+    implements SessionAwareMessageListener, MessageListener, JMSExchangeSender {
 
     private static final Logger LOG = LogUtils.getL7dLogger(JMSDestination.class);
 
@@ -167,6 +172,9 @@
      * @throws IOException
      */
     public void onMessage(javax.jms.Message message) {
+        onMessage(message, null);
+    }
+    public void onMessage(javax.jms.Message message, Session session) {
         try {
             getLogger().log(Level.FINE, "server received request: ", message);
              // Build CXF message from JMS message
@@ -194,9 +202,32 @@
                 inMessage.setContent(MessageEndpoint.class, ep);
                 JCATransactionalMessageListenerContainer.ENDPOINT_LOCAL.remove();
             }
-            
+
             // handle the incoming message
             incomingObserver.onMessage(inMessage);
+            
+            //need to propagate any exceptions back to Spring container 
+            //so transactions can occur
+            if (inMessage.getContent(Exception.class) != null && session != null) {
+                PlatformTransactionManager m = jmsConfig.getTransactionManager();
+                if (m != null) {
+                    TransactionStatus status = m.getTransaction(null);
+                    JmsResourceHolder resourceHolder =
+                        (JmsResourceHolder) TransactionSynchronizationManager
+                            .getResource(jmsConfig.getConnectionFactory());
+                    boolean trans = resourceHolder == null 
+                        || !resourceHolder.containsSession(session);
+                    if (status != null && !status.isCompleted() && trans) {
+                        Exception ex = inMessage.getContent(Exception.class);
+                        if (ex.getCause() instanceof RuntimeException) {
+                            throw (RuntimeException)ex.getCause();
+                        } else {
+                            throw new RuntimeException(ex);
+                        }
+                    }
+                }
+            }
+            
         } catch (SuspendedInvocationException ex) {
             getLogger().log(Level.FINE, "Request message has been suspended");
         } catch (UnsupportedEncodingException ex) {

Modified: cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java?rev=890957&r1=890956&r2=890957&view=diff
==============================================================================
--- cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java (original)
+++ cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java Tue Dec 15 19:32:40 2009
@@ -20,9 +20,12 @@
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.jws.WebService;
+
 
 import org.apache.cxf.systest.jms.GreeterImplDocBase;
 
+@WebService(endpointInterface = "org.apache.hello_world_doc_lit.Greeter")
 public class GreeterImplWithTransaction extends GreeterImplDocBase {
     private AtomicBoolean flag = new AtomicBoolean(true);
        

Modified: cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java?rev=890957&r1=890956&r2=890957&view=diff
==============================================================================
--- cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java (original)
+++ cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java Tue Dec 15 19:32:40 2009
@@ -23,10 +23,14 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import javax.jms.ConnectionFactory;
 import javax.xml.namespace.QName;
 
+import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 import org.apache.cxf.testutil.common.EmbeddedJMSBrokerLauncher;
+import org.apache.cxf.transport.jms.JMSConfigFeature;
+import org.apache.cxf.transport.jms.JMSConfiguration;
 import org.apache.hello_world_doc_lit.Greeter;
 import org.apache.hello_world_doc_lit.PingMeFault;
 import org.apache.hello_world_doc_lit.SOAPService2;
@@ -78,10 +82,35 @@
         SOAPService2 service = new SOAPService2(wsdl, serviceName);
         assertNotNull(service);
 
+        Greeter greeter = service.getPort(portName, Greeter.class);
+        doService(greeter, true);
+    }
+    @Test
+    public void testNonAopTransaction() throws Exception {
+        JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
+        factory.setServiceClass(Greeter.class);
+        factory.setAddress("jms://");
+
+        JMSConfiguration jmsConfig = new JMSConfiguration();
+        ConnectionFactory connectionFactory
+            = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61500");
+        jmsConfig.setConnectionFactory(connectionFactory);
+        jmsConfig.setTargetDestination("greeter.queue.noaop");
+        jmsConfig.setPubSubDomain(false);
+        jmsConfig.setUseJms11(true);
+
+        JMSConfigFeature jmsConfigFeature = new JMSConfigFeature();
+        jmsConfigFeature.setJmsConfig(jmsConfig);
+        factory.getFeatures().add(jmsConfigFeature);
+
+        Greeter greeter = (Greeter)factory.create();
+        doService(greeter, false);
+    }    
+    public void doService(Greeter greeter, boolean doEx) throws Exception {
+
         String response1 = new String("Hello ");
         
         try {
-            Greeter greeter = service.getPort(portName, Greeter.class);
                           
             String greeting = greeter.greetMe("Good guy");
             assertNotNull("No response received from service", greeting);
@@ -93,12 +122,14 @@
             exResponse = response1 + "[Bad guy]";
             assertEquals("Get unexcpeted result", exResponse, greeting);
             
-            try {
-                greeter.pingMe();
-                fail("Should have thrown FaultException");
-            } catch (PingMeFault ex) {
-                assertNotNull(ex.getFaultInfo());
-            }  
+            if (doEx) {
+                try {
+                    greeter.pingMe();
+                    fail("Should have thrown FaultException");
+                } catch (PingMeFault ex) {
+                    assertNotNull(ex.getFaultInfo());
+                }
+            }
         } catch (UndeclaredThrowableException ex) {
             throw (Exception)ex.getCause();
         }

Modified: cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/Server.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/Server.java?rev=890957&r1=890956&r2=890957&view=diff
==============================================================================
--- cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/Server.java (original)
+++ cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/Server.java Tue Dec 15 19:32:40 2009
@@ -18,8 +18,14 @@
  */
 package org.apache.cxf.systest.jms.tx;
 
+import javax.jms.ConnectionFactory;
+
+import org.apache.cxf.jaxws.EndpointImpl;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+import org.apache.cxf.transport.jms.JMSConfigFeature;
+import org.apache.cxf.transport.jms.JMSConfiguration;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.jms.connection.JmsTransactionManager;
 
 public class Server extends AbstractBusTestServerBase {
 
@@ -29,6 +35,25 @@
         ClassPathXmlApplicationContext context = 
             new ClassPathXmlApplicationContext("org/apache/cxf/systest/jms/tx/jms_server_config.xml");
         context.start();
+        
+        EndpointImpl endpoint = new EndpointImpl(new GreeterImplWithTransaction());
+        endpoint.setAddress("jms://");
+        JMSConfiguration jmsConfig = new JMSConfiguration();
+
+        ConnectionFactory connectionFactory
+            = (ConnectionFactory)context.getBean("jmsConnectionFactory", ConnectionFactory.class);
+        jmsConfig.setConnectionFactory(connectionFactory);
+        jmsConfig.setTargetDestination("greeter.queue.noaop");
+        jmsConfig.setSessionTransacted(true);
+        jmsConfig.setPubSubDomain(false);
+        jmsConfig.setUseJms11(true);
+        jmsConfig.setTransactionManager(new JmsTransactionManager(connectionFactory));
+        jmsConfig.setCacheLevel(3);
+
+        JMSConfigFeature jmsConfigFeature = new JMSConfigFeature();
+        jmsConfigFeature.setJmsConfig(jmsConfig);
+        endpoint.getFeatures().add(jmsConfigFeature);
+        endpoint.publish();
     }