You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2009/12/15 20:32:41 UTC
svn commit: r890957 - in /cxf/trunk: api/src/main/java/org/apache/cxf/phase/
rt/core/src/main/java/org/apache/cxf/interceptor/
rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/
systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/
Author: dkulp
Date: Tue Dec 15 19:32:40 2009
New Revision: 890957
URL: http://svn.apache.org/viewvc?rev=890957&view=rev
Log:
[CXF-2550] If JMS transactions aren't being handled by a spring
TransactionProxy, we need to make sure the exceptions are propogated
back to the container
Modified:
cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java
cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java
cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/Server.java
Modified: cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java?rev=890957&r1=890956&r2=890957&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java Tue Dec 15 19:32:40 2009
@@ -302,7 +302,7 @@
}
unwind(message);
- if (faultObserver != null) {
+ if (faultObserver != null && !message.getExchange().isOneWay()) {
faultObserver.onMessage(message);
}
}
Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java?rev=890957&r1=890956&r2=890957&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java Tue Dec 15 19:32:40 2009
@@ -104,24 +104,22 @@
};
synchronized (o) {
executor.execute(o);
- if (!exchange.isOneWay()) {
- if (!o.isDone()) {
- try {
- o.wait();
- } catch (InterruptedException e) {
- //IGNORE
- }
- }
+ if (!o.isDone()) {
try {
- o.get();
+ o.wait();
} catch (InterruptedException e) {
- throw new Fault(e);
- } catch (ExecutionException e) {
- if (e.getCause() instanceof RuntimeException) {
- throw (RuntimeException)e.getCause();
- } else {
- throw new Fault(e.getCause());
- }
+ //IGNORE
+ }
+ }
+ try {
+ o.get();
+ } catch (InterruptedException e) {
+ throw new Fault(e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof RuntimeException) {
+ throw (RuntimeException)e.getCause();
+ } else {
+ throw new Fault(e.getCause());
}
}
}
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=890957&r1=890956&r2=890957&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Tue Dec 15 19:32:40 2009
@@ -58,15 +58,20 @@
import org.apache.cxf.transport.jms.continuations.JMSContinuationProvider;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.wsdl.EndpointReferenceUtils;
+import org.springframework.jms.connection.JmsResourceHolder;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.SessionAwareMessageListener;
import org.springframework.jms.support.JmsUtils;
import org.springframework.jms.support.destination.DestinationResolver;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionSynchronizationManager;
-public class JMSDestination extends AbstractMultiplexDestination implements MessageListener,
- JMSExchangeSender {
+public class JMSDestination extends AbstractMultiplexDestination
+ implements SessionAwareMessageListener, MessageListener, JMSExchangeSender {
private static final Logger LOG = LogUtils.getL7dLogger(JMSDestination.class);
@@ -167,6 +172,9 @@
* @throws IOException
*/
public void onMessage(javax.jms.Message message) {
+ onMessage(message, null);
+ }
+ public void onMessage(javax.jms.Message message, Session session) {
try {
getLogger().log(Level.FINE, "server received request: ", message);
// Build CXF message from JMS message
@@ -194,9 +202,32 @@
inMessage.setContent(MessageEndpoint.class, ep);
JCATransactionalMessageListenerContainer.ENDPOINT_LOCAL.remove();
}
-
+
// handle the incoming message
incomingObserver.onMessage(inMessage);
+
+ //need to propagate any exceptions back to Spring container
+ //so transactions can occur
+ if (inMessage.getContent(Exception.class) != null && session != null) {
+ PlatformTransactionManager m = jmsConfig.getTransactionManager();
+ if (m != null) {
+ TransactionStatus status = m.getTransaction(null);
+ JmsResourceHolder resourceHolder =
+ (JmsResourceHolder) TransactionSynchronizationManager
+ .getResource(jmsConfig.getConnectionFactory());
+ boolean trans = resourceHolder == null
+ || !resourceHolder.containsSession(session);
+ if (status != null && !status.isCompleted() && trans) {
+ Exception ex = inMessage.getContent(Exception.class);
+ if (ex.getCause() instanceof RuntimeException) {
+ throw (RuntimeException)ex.getCause();
+ } else {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ }
+
} catch (SuspendedInvocationException ex) {
getLogger().log(Level.FINE, "Request message has been suspended");
} catch (UnsupportedEncodingException ex) {
Modified: cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java?rev=890957&r1=890956&r2=890957&view=diff
==============================================================================
--- cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java (original)
+++ cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java Tue Dec 15 19:32:40 2009
@@ -20,9 +20,12 @@
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jws.WebService;
+
import org.apache.cxf.systest.jms.GreeterImplDocBase;
+@WebService(endpointInterface = "org.apache.hello_world_doc_lit.Greeter")
public class GreeterImplWithTransaction extends GreeterImplDocBase {
private AtomicBoolean flag = new AtomicBoolean(true);
Modified: cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java?rev=890957&r1=890956&r2=890957&view=diff
==============================================================================
--- cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java (original)
+++ cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java Tue Dec 15 19:32:40 2009
@@ -23,10 +23,14 @@
import java.util.HashMap;
import java.util.Map;
+import javax.jms.ConnectionFactory;
import javax.xml.namespace.QName;
+import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
import org.apache.cxf.testutil.common.EmbeddedJMSBrokerLauncher;
+import org.apache.cxf.transport.jms.JMSConfigFeature;
+import org.apache.cxf.transport.jms.JMSConfiguration;
import org.apache.hello_world_doc_lit.Greeter;
import org.apache.hello_world_doc_lit.PingMeFault;
import org.apache.hello_world_doc_lit.SOAPService2;
@@ -78,10 +82,35 @@
SOAPService2 service = new SOAPService2(wsdl, serviceName);
assertNotNull(service);
+ Greeter greeter = service.getPort(portName, Greeter.class);
+ doService(greeter, true);
+ }
+ @Test
+ public void testNonAopTransaction() throws Exception {
+ JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
+ factory.setServiceClass(Greeter.class);
+ factory.setAddress("jms://");
+
+ JMSConfiguration jmsConfig = new JMSConfiguration();
+ ConnectionFactory connectionFactory
+ = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61500");
+ jmsConfig.setConnectionFactory(connectionFactory);
+ jmsConfig.setTargetDestination("greeter.queue.noaop");
+ jmsConfig.setPubSubDomain(false);
+ jmsConfig.setUseJms11(true);
+
+ JMSConfigFeature jmsConfigFeature = new JMSConfigFeature();
+ jmsConfigFeature.setJmsConfig(jmsConfig);
+ factory.getFeatures().add(jmsConfigFeature);
+
+ Greeter greeter = (Greeter)factory.create();
+ doService(greeter, false);
+ }
+ public void doService(Greeter greeter, boolean doEx) throws Exception {
+
String response1 = new String("Hello ");
try {
- Greeter greeter = service.getPort(portName, Greeter.class);
String greeting = greeter.greetMe("Good guy");
assertNotNull("No response received from service", greeting);
@@ -93,12 +122,14 @@
exResponse = response1 + "[Bad guy]";
assertEquals("Get unexcpeted result", exResponse, greeting);
- try {
- greeter.pingMe();
- fail("Should have thrown FaultException");
- } catch (PingMeFault ex) {
- assertNotNull(ex.getFaultInfo());
- }
+ if (doEx) {
+ try {
+ greeter.pingMe();
+ fail("Should have thrown FaultException");
+ } catch (PingMeFault ex) {
+ assertNotNull(ex.getFaultInfo());
+ }
+ }
} catch (UndeclaredThrowableException ex) {
throw (Exception)ex.getCause();
}
Modified: cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/Server.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/Server.java?rev=890957&r1=890956&r2=890957&view=diff
==============================================================================
--- cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/Server.java (original)
+++ cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/tx/Server.java Tue Dec 15 19:32:40 2009
@@ -18,8 +18,14 @@
*/
package org.apache.cxf.systest.jms.tx;
+import javax.jms.ConnectionFactory;
+
+import org.apache.cxf.jaxws.EndpointImpl;
import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+import org.apache.cxf.transport.jms.JMSConfigFeature;
+import org.apache.cxf.transport.jms.JMSConfiguration;
import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.jms.connection.JmsTransactionManager;
public class Server extends AbstractBusTestServerBase {
@@ -29,6 +35,25 @@
ClassPathXmlApplicationContext context =
new ClassPathXmlApplicationContext("org/apache/cxf/systest/jms/tx/jms_server_config.xml");
context.start();
+
+ EndpointImpl endpoint = new EndpointImpl(new GreeterImplWithTransaction());
+ endpoint.setAddress("jms://");
+ JMSConfiguration jmsConfig = new JMSConfiguration();
+
+ ConnectionFactory connectionFactory
+ = (ConnectionFactory)context.getBean("jmsConnectionFactory", ConnectionFactory.class);
+ jmsConfig.setConnectionFactory(connectionFactory);
+ jmsConfig.setTargetDestination("greeter.queue.noaop");
+ jmsConfig.setSessionTransacted(true);
+ jmsConfig.setPubSubDomain(false);
+ jmsConfig.setUseJms11(true);
+ jmsConfig.setTransactionManager(new JmsTransactionManager(connectionFactory));
+ jmsConfig.setCacheLevel(3);
+
+ JMSConfigFeature jmsConfigFeature = new JMSConfigFeature();
+ jmsConfigFeature.setJmsConfig(jmsConfig);
+ endpoint.getFeatures().add(jmsConfigFeature);
+ endpoint.publish();
}