You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by cc...@apache.org on 2009/04/02 05:40:02 UTC

svn commit: r761157 - in /servicemix/components/bindings/servicemix-jms/trunk/src: main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java

Author: ccustine
Date: Thu Apr  2 03:40:02 2009
New Revision: 761157

URL: http://svn.apache.org/viewvc?rev=761157&view=rev
Log:
SMXCOMP-497 - Successful InOnly MEP rolling back messages to JMS queue

Modified:
    servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
    servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java?rev=761157&r1=761156&r2=761157&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java Thu Apr  2 03:40:02 2009
@@ -426,6 +426,16 @@
     }
 
     protected void processExchange(final MessageExchange exchange, final Session session, final JmsContext context) throws Exception {
+        if (exchange instanceof InOnly) {
+            if ((ExchangeStatus.ERROR.equals(exchange.getStatus())) && (marshaler instanceof DefaultConsumerMarshaler)) {
+                if (((DefaultConsumerMarshaler)marshaler).isRollbackOnError()) {
+                    throw exchange.getError();
+                }
+            }
+            // For InOnly exchanges, ignore DONE exchanges or those where isRollbackOnError is false
+            return;
+        }
+
         // Create session if needed
         if (session == null) {
             template.execute(new SessionCallback() {

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java?rev=761157&r1=761156&r2=761157&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java Thu Apr  2 03:40:02 2009
@@ -42,6 +42,7 @@
 import org.apache.servicemix.jms.endpoints.DefaultProviderMarshaler;
 import org.apache.servicemix.jms.endpoints.JmsConsumerEndpoint;
 import org.apache.servicemix.jms.endpoints.JmsProviderEndpoint;
+import org.apache.servicemix.tck.ReceiverComponent;
 import org.springframework.jms.core.JmsTemplate;
 
 public class JmsProviderConsumerEndpointTest extends AbstractJmsTestSupport {
@@ -115,8 +116,13 @@
 
     }
 
-    public void testProviderInOnlyWithJmsTransactions() throws Exception {
+    public void testProviderInOnlyWithJmsTxRollback() throws Exception {
         ConnectionFactory connFactory = new PooledConnectionFactory(connectionFactory);
+        JmsTemplate template = new JmsTemplate(connFactory);
+        template.setReceiveTimeout(2000);
+        // Make sure there are no messages stuck on queue from previous tests
+        template.receive("destination");
+
         JmsComponent jmsComponent = new JmsComponent();
         JmsConsumerEndpoint consumerEndpoint = createInOnlyConsumerEndpoint(connFactory, true);
         consumerEndpoint.setTransacted("jms");
@@ -156,9 +162,40 @@
         // trying to get the message from the queue
         container.deactivateComponent("servicemix-jms");
 
+        assertNotNull("Message should still be on the queue", template.receive("destination"));
+    }
+
+    public void testProviderInOnlyWithJmsTx() throws Exception {
+        ConnectionFactory connFactory = new PooledConnectionFactory(connectionFactory);
         JmsTemplate template = new JmsTemplate(connFactory);
         template.setReceiveTimeout(2000);
-        assertNotNull("Message should still be on the queue", template.receive("destination"));
+        // Make sure there are no messages stuck on queue from previous tests
+        template.receive("destination");
+
+        JmsComponent jmsComponent = new JmsComponent();
+        JmsConsumerEndpoint consumerEndpoint = createInOnlyConsumerEndpoint(connFactory, true);
+        consumerEndpoint.setTransacted("jms");
+        JmsProviderEndpoint providerEndpoint = createProviderEndpoint(connFactory);
+        jmsComponent.setEndpoints(new JmsEndpointType[] {consumerEndpoint, providerEndpoint});
+        container.activateComponent(jmsComponent, "servicemix-jms");
+
+        ReceiverComponent rcvr = new ReceiverComponent();
+        rcvr.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
+        rcvr.setEndpoint("endpoint");
+        ActivationSpec asRcvr = new ActivationSpec("receiver", rcvr);
+        container.activateComponent(asRcvr);
+
+        InOnly exchange;
+        exchange = client.createInOnlyExchange();
+        exchange.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+        exchange.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        client.sendSync(exchange);
+        rcvr.getMessageList().assertMessagesReceived(1);
+
+        container.deactivateComponent("servicemix-jms");
+
+        assertNull("Message should not be on the queue", template.receive("destination"));
+
     }
 
 
@@ -174,6 +211,8 @@
         endpoint.setListenerType("simple");
         endpoint.setConnectionFactory(connFactory);
         endpoint.setDestinationName("destination");
+        endpoint.setRecoveryInterval(10000);
+        endpoint.setConcurrentConsumers(1);
         endpoint.setTargetService(new QName("http://jms.servicemix.org/Test", "Echo"));
         return endpoint;
     }