You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by cc...@apache.org on 2009/04/08 19:14:45 UTC

svn commit: r763324 - in /servicemix/components/bindings/servicemix-jms/trunk/src: main/java/org/apache/servicemix/jms/endpoints/ test/java/org/apache/servicemix/jms/

Author: ccustine
Date: Wed Apr  8 17:14:45 2009
New Revision: 763324

URL: http://svn.apache.org/viewvc?rev=763324&view=rev
Log:
SMXCOMP-474 - Regression introduced in servicemix-jms default consumer which does not rollback the transactions by default when an exchange with an ERROR status comes back
- With transaction=jms or xa, rollbackonerror flag defaults to true.
- With transaction=none, rollbackonerror is false.

Modified:
    servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java
    servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsConsumerEndpoint.java
    servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java?rev=763324&r1=763323&r2=763324&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java Wed Apr  8 17:14:45 2009
@@ -53,7 +53,7 @@
 public class DefaultConsumerMarshaler extends AbstractJmsMarshaler implements JmsConsumerMarshaler {
     
     private URI mep;
-    private boolean rollbackOnError = true;
+    private boolean rollbackOnError;
 
     public DefaultConsumerMarshaler() {
         this.mep = JbiConstants.IN_ONLY;

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsConsumerEndpoint.java?rev=763324&r1=763323&r2=763324&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsConsumerEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsConsumerEndpoint.java Wed Apr  8 17:14:45 2009
@@ -549,6 +549,14 @@
             && !TRANSACTED_XA.equals(transacted)) {
             throw new DeploymentException("transacted must be none, jms or xa");
         }
+
+        // Provide some intelligent defaults for rollback policy
+        if (TRANSACTED_XA.equals(transacted) || TRANSACTED_JMS.equals(transacted)) {
+            JmsConsumerMarshaler marshaler = getMarshaler();
+            if (marshaler instanceof DefaultConsumerMarshaler) {
+                ((DefaultConsumerMarshaler)marshaler).setRollbackOnError(true);
+            }
+        }
     }
     
     protected AbstractMessageListenerContainer createListenerContainer() {

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java?rev=763324&r1=763323&r2=763324&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java Wed Apr  8 17:14:45 2009
@@ -124,7 +124,7 @@
         template.receive("destination");
 
         JmsComponent jmsComponent = new JmsComponent();
-        JmsConsumerEndpoint consumerEndpoint = createInOnlyConsumerEndpoint(connFactory, true);
+        JmsConsumerEndpoint consumerEndpoint = createInOnlyConsumerEndpoint(connFactory);
         consumerEndpoint.setTransacted("jms");
         JmsProviderEndpoint providerEndpoint = createProviderEndpoint(connFactory);
         jmsComponent.setEndpoints(new JmsEndpointType[] {consumerEndpoint, providerEndpoint});
@@ -173,7 +173,7 @@
         template.receive("destination");
 
         JmsComponent jmsComponent = new JmsComponent();
-        JmsConsumerEndpoint consumerEndpoint = createInOnlyConsumerEndpoint(connFactory, true);
+        JmsConsumerEndpoint consumerEndpoint = createInOnlyConsumerEndpoint(connFactory); //, true);
         consumerEndpoint.setTransacted("jms");
         JmsProviderEndpoint providerEndpoint = createProviderEndpoint(connFactory);
         jmsComponent.setEndpoints(new JmsEndpointType[] {consumerEndpoint, providerEndpoint});
@@ -196,15 +196,59 @@
 
     }
 
+    public void testProviderInOnlyWithNoneTx() throws Exception {
+        ConnectionFactory connFactory = new PooledConnectionFactory(connectionFactory);
+        JmsTemplate template = new JmsTemplate(connFactory);
+        template.setReceiveTimeout(2000);
+        // Make sure there are no messages stuck on queue from previous tests
+        template.receive("destination");
+
+        JmsComponent jmsComponent = new JmsComponent();
+        JmsConsumerEndpoint consumerEndpoint = createInOnlyConsumerEndpoint(connFactory);
+        consumerEndpoint.setTransacted("none");
+        JmsProviderEndpoint providerEndpoint = createProviderEndpoint(connFactory);
+        jmsComponent.setEndpoints(new JmsEndpointType[] {consumerEndpoint, providerEndpoint});
+        container.activateComponent(jmsComponent, "servicemix-jms");
+
+        final int[] receiveCount = new int[]{0};
+
+        ReturnErrorComponent error = new ReturnErrorComponent(new RuntimeException("Error: abort... abort...!!")) {
+            public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+                receiveCount[0]++;
+                super.onMessageExchange(exchange);
+            }
+        };
+
+        ActivationSpec asError = new ActivationSpec("receiver", error);
+        asError.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
+        container.activateComponent(asError);
+
+        InOnly exchange = client.createInOnlyExchange();
+        exchange.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+        exchange.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        client.sendSync(exchange);
+
+        // Loop and wait for at least one attempt to process the message
+        for (int i = 0; i < 5; i++) {
+            Thread.sleep(1000);
+            if (receiveCount[0] > 0) {
+                break;
+            }
+        }
+
+        assertTrue("The message was never processed by servicemix-jms", receiveCount[0] > 0);
+
+        assertNull("Message should not be on the queue", template.receive("destination"));
+    }
+
+
 
-    private JmsConsumerEndpoint createInOnlyConsumerEndpoint(ConnectionFactory connFactory,
-                                                             boolean rollbackOnError) throws URISyntaxException {
+    private JmsConsumerEndpoint createInOnlyConsumerEndpoint(ConnectionFactory connFactory) throws URISyntaxException {
         JmsConsumerEndpoint endpoint = new JmsConsumerEndpoint();
         endpoint.setService(new QName("http://jms.servicemix.org/Test", "Consumer"));
         endpoint.setEndpoint("endpoint");
         DefaultConsumerMarshaler marshaler = new DefaultConsumerMarshaler();
         marshaler.setMep(new URI("http://www.w3.org/2004/08/wsdl/in-only"));
-        marshaler.setRollbackOnError(rollbackOnError);
         endpoint.setMarshaler(marshaler);
         endpoint.setListenerType("simple");
         endpoint.setConnectionFactory(connFactory);
@@ -221,7 +265,6 @@
         endpoint.setEndpoint("endpoint");
         DefaultConsumerMarshaler marshaler = new DefaultConsumerMarshaler();
         marshaler.setMep(new URI("http://www.w3.org/2004/08/wsdl/in-out"));
-        marshaler.setRollbackOnError(false);
         endpoint.setMarshaler(marshaler);
         endpoint.setListenerType("simple");
         endpoint.setConnectionFactory(connFactory);