You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by cc...@apache.org on 2009/04/02 05:40:02 UTC
svn commit: r761157 - in
/servicemix/components/bindings/servicemix-jms/trunk/src:
main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
Author: ccustine
Date: Thu Apr 2 03:40:02 2009
New Revision: 761157
URL: http://svn.apache.org/viewvc?rev=761157&view=rev
Log:
SMXCOMP-497 - Successful InOnly MEP rolling back messages to JMS queue
Modified:
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java?rev=761157&r1=761156&r2=761157&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java Thu Apr 2 03:40:02 2009
@@ -426,6 +426,16 @@
}
protected void processExchange(final MessageExchange exchange, final Session session, final JmsContext context) throws Exception {
+ if (exchange instanceof InOnly) {
+ if ((ExchangeStatus.ERROR.equals(exchange.getStatus())) && (marshaler instanceof DefaultConsumerMarshaler)) {
+ if (((DefaultConsumerMarshaler)marshaler).isRollbackOnError()) {
+ throw exchange.getError();
+ }
+ }
+ // For InOnly exchanges, ignore DONE exchanges or those where isRollbackOnError is false
+ return;
+ }
+
// Create session if needed
if (session == null) {
template.execute(new SessionCallback() {
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java?rev=761157&r1=761156&r2=761157&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java Thu Apr 2 03:40:02 2009
@@ -42,6 +42,7 @@
import org.apache.servicemix.jms.endpoints.DefaultProviderMarshaler;
import org.apache.servicemix.jms.endpoints.JmsConsumerEndpoint;
import org.apache.servicemix.jms.endpoints.JmsProviderEndpoint;
+import org.apache.servicemix.tck.ReceiverComponent;
import org.springframework.jms.core.JmsTemplate;
public class JmsProviderConsumerEndpointTest extends AbstractJmsTestSupport {
@@ -115,8 +116,13 @@
}
- public void testProviderInOnlyWithJmsTransactions() throws Exception {
+ public void testProviderInOnlyWithJmsTxRollback() throws Exception {
ConnectionFactory connFactory = new PooledConnectionFactory(connectionFactory);
+ JmsTemplate template = new JmsTemplate(connFactory);
+ template.setReceiveTimeout(2000);
+ // Make sure there are no messages stuck on queue from previous tests
+ template.receive("destination");
+
JmsComponent jmsComponent = new JmsComponent();
JmsConsumerEndpoint consumerEndpoint = createInOnlyConsumerEndpoint(connFactory, true);
consumerEndpoint.setTransacted("jms");
@@ -156,9 +162,40 @@
// trying to get the message from the queue
container.deactivateComponent("servicemix-jms");
+ assertNotNull("Message should still be on the queue", template.receive("destination"));
+ }
+
+ public void testProviderInOnlyWithJmsTx() throws Exception {
+ ConnectionFactory connFactory = new PooledConnectionFactory(connectionFactory);
JmsTemplate template = new JmsTemplate(connFactory);
template.setReceiveTimeout(2000);
- assertNotNull("Message should still be on the queue", template.receive("destination"));
+ // Make sure there are no messages stuck on queue from previous tests
+ template.receive("destination");
+
+ JmsComponent jmsComponent = new JmsComponent();
+ JmsConsumerEndpoint consumerEndpoint = createInOnlyConsumerEndpoint(connFactory, true);
+ consumerEndpoint.setTransacted("jms");
+ JmsProviderEndpoint providerEndpoint = createProviderEndpoint(connFactory);
+ jmsComponent.setEndpoints(new JmsEndpointType[] {consumerEndpoint, providerEndpoint});
+ container.activateComponent(jmsComponent, "servicemix-jms");
+
+ ReceiverComponent rcvr = new ReceiverComponent();
+ rcvr.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
+ rcvr.setEndpoint("endpoint");
+ ActivationSpec asRcvr = new ActivationSpec("receiver", rcvr);
+ container.activateComponent(asRcvr);
+
+ InOnly exchange;
+ exchange = client.createInOnlyExchange();
+ exchange.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+ exchange.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+ client.sendSync(exchange);
+ rcvr.getMessageList().assertMessagesReceived(1);
+
+ container.deactivateComponent("servicemix-jms");
+
+ assertNull("Message should not be on the queue", template.receive("destination"));
+
}
@@ -174,6 +211,8 @@
endpoint.setListenerType("simple");
endpoint.setConnectionFactory(connFactory);
endpoint.setDestinationName("destination");
+ endpoint.setRecoveryInterval(10000);
+ endpoint.setConcurrentConsumers(1);
endpoint.setTargetService(new QName("http://jms.servicemix.org/Test", "Echo"));
return endpoint;
}