You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/12/30 11:17:02 UTC

svn commit: r730084 - in /servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms: ./ src/main/java/org/apache/servicemix/jms/endpoints/ src/test/java/org/apache/servicemix/jms/

Author: gertv
Date: Tue Dec 30 02:17:02 2008
New Revision: 730084

URL: http://svn.apache.org/viewvc?rev=730084&view=rev
Log:
SM-1752: No rollback for failed InOnly exchange

Added:
    servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointXaTest.java   (with props)
Modified:
    servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/pom.xml
    servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
    servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java
    servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointTest.java

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/pom.xml?rev=730084&r1=730083&r2=730084&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/pom.xml (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/pom.xml Tue Dec 30 02:17:02 2008
@@ -198,6 +198,12 @@
       <version>${junit-version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.jencks</groupId>
+      <artifactId>jencks-amqpool</artifactId>
+      <version>${jencks-version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java?rev=730084&r1=730083&r2=730084&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java Tue Dec 30 02:17:02 2008
@@ -354,8 +354,12 @@
     }
 
     protected void processExchange(final MessageExchange exchange, final Session session, final JmsContext context) throws Exception {
-        // Ignore InOnly exchanges which are currently handled in fire-and-forget mode
         if (exchange instanceof InOnly) {
+            // throw an exception when the exchange is transacted to ensure correct rollback of the transaction
+            if (exchange.isTransacted() && ExchangeStatus.ERROR.equals(exchange.getStatus())) {
+                throw exchange.getError();
+            }
+            // ignore non-transacted and/or DONE exchanges
             return;
         }
         // Create session if needed

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java?rev=730084&r1=730083&r2=730084&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java Tue Dec 30 02:17:02 2008
@@ -83,8 +83,7 @@
     public MessageExchange createExchange(JmsContext jmsContext, ComponentContext jbiContext)
         throws Exception {
         Context ctx = (Context)jmsContext;
-        MessageExchange exchange = jbiContext.getDeliveryChannel().createExchangeFactory()
-            .createExchange(mep);
+        MessageExchange exchange = jbiContext.getDeliveryChannel().createExchangeFactory().createExchange(mep);
         NormalizedMessage inMessage = exchange.createMessage();
         populateMessage(ctx.message, inMessage);
         if (isCopyProperties()) {

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointTest.java?rev=730084&r1=730083&r2=730084&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointTest.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointTest.java Tue Dec 30 02:17:02 2008
@@ -256,14 +256,14 @@
         endpoint.setListenerType("default");
         endpoint.setConnectionFactory(connectionFactory);
         endpoint.setDestinationName("destination");
-        endpoint.setTransacted("xa");
+        endpoint.setTransacted("jms");
         component.setEndpoints(new JmsConsumerEndpoint[] {endpoint});
         container.activateComponent(component, "servicemix-jms");
 
         jmsTemplate.convertAndSend("destination", "<hello>world</hello>");
         receiver.getMessageList().assertMessagesReceived(1);
         Thread.sleep(500);
-    }
+    } 
 
     public void testConsumerServer() throws Exception {
         JmsComponent component = new JmsComponent();

Added: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointXaTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointXaTest.java?rev=730084&view=auto
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointXaTest.java (added)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointXaTest.java Tue Dec 30 02:17:02 2008
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
+import org.apache.servicemix.jms.endpoints.JmsConsumerEndpoint;
+import org.apache.servicemix.tck.ReceiverComponent;
+import org.jencks.amqpool.XaPooledConnectionFactory;
+
+public class JmsConsumerEndpointXaTest extends AbstractJmsTestSupport {
+
+    private XaPooledConnectionFactory xaConnectionFactory;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        xaConnectionFactory = new XaPooledConnectionFactory("vm://localhost");
+        xaConnectionFactory.setTransactionManager((TransactionManager)container.getTransactionManager());
+
+        container.setAutoEnlistInTransaction(true);
+    }
+    
+    public void testConsumerCommitXaTx() throws Exception {
+        JmsComponent component = new JmsComponent();
+        JmsConsumerEndpoint endpoint = new JmsConsumerEndpoint();
+        endpoint.setService(new QName("jms"));
+        endpoint.setEndpoint("endpoint");
+        endpoint.setTargetService(new QName("error"));
+        endpoint.setListenerType("default");
+        endpoint.setConnectionFactory(xaConnectionFactory);
+        endpoint.setDestinationName("destination");
+        endpoint.setTransacted("xa");
+        component.setEndpoints(new JmsConsumerEndpoint[] {endpoint});
+        container.activateComponent(component, "servicemix-jms");
+
+        ReceiverComponent errors = new ReceiverComponent() {
+
+            @Override
+            public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+                assertTransaction(exchange, Status.STATUS_COMMITTED);
+                super.onMessageExchange(exchange);
+            }
+        };
+        errors.setService(new QName("error"));
+        errors.setEndpoint("endpoint");
+        container.activateComponent(errors, "errors");
+
+        jmsTemplate.convertAndSend("destination", "<hello>world</hello>");
+        errors.getMessageList().assertMessagesReceived(1);
+        Thread.sleep(500);
+    }
+
+    public void testConsumerRollbackXaTx() throws Exception {
+        JmsComponent component = new JmsComponent();
+        JmsConsumerEndpoint endpoint = new JmsConsumerEndpoint();
+        endpoint.setService(new QName("jms"));
+        endpoint.setEndpoint("endpoint");
+        endpoint.setTargetService(new QName("error"));
+        endpoint.setListenerType("default");
+        endpoint.setConnectionFactory(xaConnectionFactory);
+        endpoint.setDestinationName("destination");
+        endpoint.setTransacted("xa");
+        component.setEndpoints(new JmsConsumerEndpoint[] {endpoint});
+        container.activateComponent(component, "servicemix-jms");
+
+        ReceiverComponent errors = new ReceiverComponent() {
+
+            @Override
+            public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+                assertTransaction(exchange, Status.STATUS_ROLLEDBACK);
+                
+                getMessageList().addMessage(exchange.getMessage("in"));
+                exchange.setError(new RuntimeException("Could you please rollback?"));
+                send(exchange);
+            }
+        };
+        errors.setService(new QName("error"));
+        errors.setEndpoint("endpoint");
+        container.activateComponent(errors, "errors");
+
+        jmsTemplate.convertAndSend("destination", "<hello>world</hello>");
+        errors.getMessageList().assertMessagesReceived(1);
+        Thread.sleep(500);
+    }
+    
+    private void assertTransaction(MessageExchange exchange, final int expected) {
+        assertTrue(exchange.isTransacted());
+        try {
+            ((MessageExchangeImpl) exchange).getTransactionContext().registerSynchronization(new Synchronization() {
+                public void afterCompletion(int status) {
+                    assertEquals(expected, status);
+                }
+
+                public void beforeCompletion() {
+                }                
+            });
+        } catch (IllegalStateException e) {
+            fail(e.getMessage());
+        } catch (RollbackException e) {
+            fail(e.getMessage());
+        } catch (SystemException e) {
+            fail(e.getMessage());
+        }
+    }
+}

Propchange: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointXaTest.java
------------------------------------------------------------------------------
    svn:eol-style = native