You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by cc...@apache.org on 2009/03/25 06:26:43 UTC

svn commit: r758144 - in /servicemix/components/bindings/servicemix-jms/trunk/src: main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java

Author: ccustine
Date: Wed Mar 25 05:26:42 2009
New Revision: 758144

URL: http://svn.apache.org/viewvc?rev=758144&view=rev
Log:
SMXCOMP-486 - InOnly exchange does not rollback jms message on exchange error

Modified:
    servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
    servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java?rev=758144&r1=758143&r2=758144&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java Wed Mar 25 05:26:42 2009
@@ -426,14 +426,6 @@
     }
 
     protected void processExchange(final MessageExchange exchange, final Session session, final JmsContext context) throws Exception {
-        if (exchange instanceof InOnly) {
-            // throw an exception when the exchange is transacted to ensure correct rollback of the transaction
-            if (exchange.isTransacted() && ExchangeStatus.ERROR.equals(exchange.getStatus())) {
-                throw exchange.getError();
-            }
-            // ignore non-transacted and/or DONE exchanges
-            return;
-        }
         // Create session if needed
         if (session == null) {
             template.execute(new SessionCallback() {

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java?rev=758144&r1=758143&r2=758144&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java Wed Mar 25 05:26:42 2009
@@ -21,7 +21,10 @@
 
 import javax.activation.DataHandler;
 import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jms.ConnectionFactory;
 import javax.mail.util.ByteArrayDataSource;
@@ -39,6 +42,7 @@
 import org.apache.servicemix.jms.endpoints.DefaultProviderMarshaler;
 import org.apache.servicemix.jms.endpoints.JmsConsumerEndpoint;
 import org.apache.servicemix.jms.endpoints.JmsProviderEndpoint;
+import org.springframework.jms.core.JmsTemplate;
 
 public class JmsProviderConsumerEndpointTest extends AbstractJmsTestSupport {
 
@@ -61,7 +65,7 @@
         InOut inout = null;
         boolean result = false;
         DataHandler dh = null;
-        
+
         // Test successful return
         inout = client.createInOutExchange();
         inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
@@ -76,16 +80,16 @@
         assertNotNull(src);
         dh = out.getAttachment("myImage");
         assertNotNull(dh);
-        
+
         logger.info(new SourceTransformer().toString(src));
 
-        // Test fault return 
+        // Test fault return
         container.deactivateComponent("receiver");
         ReturnFaultComponent fault = new ReturnFaultComponent();
         ActivationSpec asFault = new ActivationSpec("receiver", fault);
         asFault.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
         container.activateComponent(asFault);
-        
+
         inout = client.createInOutExchange();
         inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
         inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
@@ -101,7 +105,7 @@
         ActivationSpec asError = new ActivationSpec("receiver", error);
         asError.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
         container.activateComponent(asError);
-        
+
         inout = client.createInOutExchange();
         inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
         inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
@@ -111,6 +115,69 @@
 
     }
 
+    public void testProviderInOnlyWithJmsTransactions() throws Exception {
+        ConnectionFactory connFactory = new PooledConnectionFactory(connectionFactory);
+        JmsComponent jmsComponent = new JmsComponent();
+        JmsConsumerEndpoint consumerEndpoint = createInOnlyConsumerEndpoint(connFactory, true);
+        consumerEndpoint.setTransacted("jms");
+        JmsProviderEndpoint providerEndpoint = createProviderEndpoint(connFactory);
+        jmsComponent.setEndpoints(new JmsEndpointType[] {consumerEndpoint, providerEndpoint});
+        container.activateComponent(jmsComponent, "servicemix-jms");
+
+        final int[] receiveCount = new int[]{0};
+
+        ReturnErrorComponent error = new ReturnErrorComponent(new RuntimeException("Error: abort... abort...!!")) {
+            public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+                receiveCount[0]++;
+                super.onMessageExchange(exchange);
+            }
+        };
+
+        ActivationSpec asError = new ActivationSpec("receiver", error);
+        asError.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
+        container.activateComponent(asError);
+
+        InOnly exchange = client.createInOnlyExchange();
+        exchange.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+        exchange.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        client.sendSync(exchange);
+
+        // Loop and wait for at least one attempt to process the message
+        for (int i = 0; i < 5; i++) {
+            Thread.sleep(1000);
+            if (receiveCount[0] > 0) {
+                break;
+            }
+        }
+
+        assertTrue("The message was never processed by servicemix-jms", receiveCount[0] > 0);
+
+        // Deactivate the JMS component so that it stops
+        // trying to get the message from the queue
+        container.deactivateComponent("servicemix-jms");
+
+        JmsTemplate template = new JmsTemplate(connFactory);
+        template.setReceiveTimeout(2000);
+        assertNotNull("Message should still be on the queue", template.receive("destination"));
+    }
+
+
+    private JmsConsumerEndpoint createInOnlyConsumerEndpoint(ConnectionFactory connFactory,
+                                                             boolean rollbackOnError) throws URISyntaxException {
+        JmsConsumerEndpoint endpoint = new JmsConsumerEndpoint();
+        endpoint.setService(new QName("http://jms.servicemix.org/Test", "Consumer"));
+        endpoint.setEndpoint("endpoint");
+        DefaultConsumerMarshaler marshaler = new DefaultConsumerMarshaler();
+        marshaler.setMep(new URI("http://www.w3.org/2004/08/wsdl/in-only"));
+        marshaler.setRollbackOnError(rollbackOnError);
+        endpoint.setMarshaler(marshaler);
+        endpoint.setListenerType("simple");
+        endpoint.setConnectionFactory(connFactory);
+        endpoint.setDestinationName("destination");
+        endpoint.setTargetService(new QName("http://jms.servicemix.org/Test", "Echo"));
+        return endpoint;
+    }
+
     private JmsConsumerEndpoint createConsumerEndpoint(ConnectionFactory connFactory) throws URISyntaxException {
         JmsConsumerEndpoint endpoint = new JmsConsumerEndpoint();
         endpoint.setService(new QName("http://jms.servicemix.org/Test", "Consumer"));
@@ -125,7 +192,7 @@
         endpoint.setTargetService(new QName("http://jms.servicemix.org/Test", "Echo"));
         return endpoint;
     }
-    
+
     private JmsProviderEndpoint createProviderEndpoint(ConnectionFactory connFactory) {
         JmsProviderEndpoint endpoint = new JmsProviderEndpoint();
         endpoint.setService(new QName("http://jms.servicemix.org/Test", "Provider"));