You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/12/30 11:17:25 UTC
svn commit: r730085 - in
/servicemix/components/bindings/servicemix-jms/trunk: ./
src/main/java/org/apache/servicemix/jms/endpoints/
src/test/java/org/apache/servicemix/jms/
Author: gertv
Date: Tue Dec 30 02:17:25 2008
New Revision: 730085
URL: http://svn.apache.org/viewvc?rev=730085&view=rev
Log:
SM-1752: No rollback for failed InOnly exchange
Added:
servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointXaTest.java (with props)
Modified:
servicemix/components/bindings/servicemix-jms/trunk/pom.xml
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointTest.java
Modified: servicemix/components/bindings/servicemix-jms/trunk/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/pom.xml?rev=730085&r1=730084&r2=730085&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/pom.xml (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/pom.xml Tue Dec 30 02:17:25 2008
@@ -44,6 +44,7 @@
<servicemix-version>3.3</servicemix-version>
<servicemix-shared-version>2008.02-SNAPSHOT</servicemix-shared-version>
<activemq-version>5.2.0</activemq-version>
+ <jencks-version>2.1</jencks-version>
<servicemix.osgi.import>
javax.security.auth,
@@ -116,7 +117,7 @@
<dependency>
<groupId>org.jencks</groupId>
<artifactId>jencks</artifactId>
- <version>2.1</version>
+ <version>${jencks-version}</version>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>
@@ -290,6 +291,12 @@
<version>${spring-version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.jencks</groupId>
+ <artifactId>jencks-amqpool</artifactId>
+ <version>${jencks-version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java?rev=730085&r1=730084&r2=730085&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java Tue Dec 30 02:17:25 2008
@@ -426,8 +426,12 @@
}
protected void processExchange(final MessageExchange exchange, final Session session, final JmsContext context) throws Exception {
- // Ignore InOnly exchanges which are currently handled in fire-and-forget mode
if (exchange instanceof InOnly) {
+ // throw an exception when the exchange is transacted to ensure correct rollback of the transaction
+ if (exchange.isTransacted() && ExchangeStatus.ERROR.equals(exchange.getStatus())) {
+ throw exchange.getError();
+ }
+ // ignore non-transacted and/or DONE exchanges
return;
}
// Create session if needed
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointTest.java?rev=730085&r1=730084&r2=730085&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointTest.java Tue Dec 30 02:17:25 2008
@@ -256,14 +256,14 @@
endpoint.setListenerType("default");
endpoint.setConnectionFactory(connectionFactory);
endpoint.setDestinationName("destination");
- endpoint.setTransacted("xa");
+ endpoint.setTransacted("jms");
component.setEndpoints(new JmsConsumerEndpoint[] {endpoint});
container.activateComponent(component, "servicemix-jms");
jmsTemplate.convertAndSend("destination", "<hello>world</hello>");
receiver.getMessageList().assertMessagesReceived(1);
Thread.sleep(500);
- }
+ }
public void testConsumerServer() throws Exception {
JmsComponent component = new JmsComponent();
Added: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointXaTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointXaTest.java?rev=730085&view=auto
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointXaTest.java (added)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointXaTest.java Tue Dec 30 02:17:25 2008
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
+import org.apache.servicemix.jms.endpoints.JmsConsumerEndpoint;
+import org.apache.servicemix.tck.ReceiverComponent;
+import org.jencks.amqpool.XaPooledConnectionFactory;
+
+public class JmsConsumerEndpointXaTest extends AbstractJmsTestSupport {
+
+ private XaPooledConnectionFactory xaConnectionFactory;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ xaConnectionFactory = new XaPooledConnectionFactory("vm://localhost");
+ xaConnectionFactory.setTransactionManager((TransactionManager)container.getTransactionManager());
+
+ container.setAutoEnlistInTransaction(true);
+ }
+
+ public void testConsumerCommitXaTx() throws Exception {
+ JmsComponent component = new JmsComponent();
+ JmsConsumerEndpoint endpoint = new JmsConsumerEndpoint();
+ endpoint.setService(new QName("jms"));
+ endpoint.setEndpoint("endpoint");
+ endpoint.setTargetService(new QName("error"));
+ endpoint.setListenerType("default");
+ endpoint.setConnectionFactory(xaConnectionFactory);
+ endpoint.setDestinationName("destination");
+ endpoint.setTransacted("xa");
+ component.setEndpoints(new JmsConsumerEndpoint[] {endpoint});
+ container.activateComponent(component, "servicemix-jms");
+
+ ReceiverComponent errors = new ReceiverComponent() {
+
+ @Override
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ assertTransaction(exchange, Status.STATUS_COMMITTED);
+ super.onMessageExchange(exchange);
+ }
+ };
+ errors.setService(new QName("error"));
+ errors.setEndpoint("endpoint");
+ container.activateComponent(errors, "errors");
+
+ jmsTemplate.convertAndSend("destination", "<hello>world</hello>");
+ errors.getMessageList().assertMessagesReceived(1);
+ Thread.sleep(500);
+ }
+
+ public void testConsumerRollbackXaTx() throws Exception {
+ JmsComponent component = new JmsComponent();
+ JmsConsumerEndpoint endpoint = new JmsConsumerEndpoint();
+ endpoint.setService(new QName("jms"));
+ endpoint.setEndpoint("endpoint");
+ endpoint.setTargetService(new QName("error"));
+ endpoint.setListenerType("default");
+ endpoint.setConnectionFactory(xaConnectionFactory);
+ endpoint.setDestinationName("destination");
+ endpoint.setTransacted("xa");
+ component.setEndpoints(new JmsConsumerEndpoint[] {endpoint});
+ container.activateComponent(component, "servicemix-jms");
+
+ ReceiverComponent errors = new ReceiverComponent() {
+
+ @Override
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ assertTransaction(exchange, Status.STATUS_ROLLEDBACK);
+
+ getMessageList().addMessage(exchange.getMessage("in"));
+ exchange.setError(new RuntimeException("Could you please rollback?"));
+ send(exchange);
+ }
+ };
+ errors.setService(new QName("error"));
+ errors.setEndpoint("endpoint");
+ container.activateComponent(errors, "errors");
+
+ jmsTemplate.convertAndSend("destination", "<hello>world</hello>");
+ errors.getMessageList().assertMessagesReceived(1);
+ Thread.sleep(500);
+ }
+
+ private void assertTransaction(MessageExchange exchange, final int expected) {
+ assertTrue(exchange.isTransacted());
+ try {
+ ((MessageExchangeImpl) exchange).getTransactionContext().registerSynchronization(new Synchronization() {
+ public void afterCompletion(int status) {
+ assertEquals(expected, status);
+ }
+
+ public void beforeCompletion() {
+ }
+ });
+ } catch (IllegalStateException e) {
+ fail(e.getMessage());
+ } catch (RollbackException e) {
+ fail(e.getMessage());
+ } catch (SystemException e) {
+ fail(e.getMessage());
+ }
+ }
+}
Propchange: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointXaTest.java
------------------------------------------------------------------------------
svn:eol-style = native