You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by cc...@apache.org on 2009/03/25 06:26:43 UTC
svn commit: r758144 - in
/servicemix/components/bindings/servicemix-jms/trunk/src:
main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
Author: ccustine
Date: Wed Mar 25 05:26:42 2009
New Revision: 758144
URL: http://svn.apache.org/viewvc?rev=758144&view=rev
Log:
SMXCOMP-486 - InOnly exchange does not rollback jms message on exchange error
Modified:
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java?rev=758144&r1=758143&r2=758144&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java Wed Mar 25 05:26:42 2009
@@ -426,14 +426,6 @@
}
protected void processExchange(final MessageExchange exchange, final Session session, final JmsContext context) throws Exception {
- if (exchange instanceof InOnly) {
- // throw an exception when the exchange is transacted to ensure correct rollback of the transaction
- if (exchange.isTransacted() && ExchangeStatus.ERROR.equals(exchange.getStatus())) {
- throw exchange.getError();
- }
- // ignore non-transacted and/or DONE exchanges
- return;
- }
// Create session if needed
if (session == null) {
template.execute(new SessionCallback() {
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java?rev=758144&r1=758143&r2=758144&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java Wed Mar 25 05:26:42 2009
@@ -21,7 +21,10 @@
import javax.activation.DataHandler;
import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.jms.ConnectionFactory;
import javax.mail.util.ByteArrayDataSource;
@@ -39,6 +42,7 @@
import org.apache.servicemix.jms.endpoints.DefaultProviderMarshaler;
import org.apache.servicemix.jms.endpoints.JmsConsumerEndpoint;
import org.apache.servicemix.jms.endpoints.JmsProviderEndpoint;
+import org.springframework.jms.core.JmsTemplate;
public class JmsProviderConsumerEndpointTest extends AbstractJmsTestSupport {
@@ -61,7 +65,7 @@
InOut inout = null;
boolean result = false;
DataHandler dh = null;
-
+
// Test successful return
inout = client.createInOutExchange();
inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
@@ -76,16 +80,16 @@
assertNotNull(src);
dh = out.getAttachment("myImage");
assertNotNull(dh);
-
+
logger.info(new SourceTransformer().toString(src));
- // Test fault return
+ // Test fault return
container.deactivateComponent("receiver");
ReturnFaultComponent fault = new ReturnFaultComponent();
ActivationSpec asFault = new ActivationSpec("receiver", fault);
asFault.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
container.activateComponent(asFault);
-
+
inout = client.createInOutExchange();
inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
@@ -101,7 +105,7 @@
ActivationSpec asError = new ActivationSpec("receiver", error);
asError.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
container.activateComponent(asError);
-
+
inout = client.createInOutExchange();
inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
@@ -111,6 +115,69 @@
}
+ public void testProviderInOnlyWithJmsTransactions() throws Exception {
+ ConnectionFactory connFactory = new PooledConnectionFactory(connectionFactory);
+ JmsComponent jmsComponent = new JmsComponent();
+ JmsConsumerEndpoint consumerEndpoint = createInOnlyConsumerEndpoint(connFactory, true);
+ consumerEndpoint.setTransacted("jms");
+ JmsProviderEndpoint providerEndpoint = createProviderEndpoint(connFactory);
+ jmsComponent.setEndpoints(new JmsEndpointType[] {consumerEndpoint, providerEndpoint});
+ container.activateComponent(jmsComponent, "servicemix-jms");
+
+ final int[] receiveCount = new int[]{0};
+
+ ReturnErrorComponent error = new ReturnErrorComponent(new RuntimeException("Error: abort... abort...!!")) {
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ receiveCount[0]++;
+ super.onMessageExchange(exchange);
+ }
+ };
+
+ ActivationSpec asError = new ActivationSpec("receiver", error);
+ asError.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
+ container.activateComponent(asError);
+
+ InOnly exchange = client.createInOnlyExchange();
+ exchange.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+ exchange.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+ client.sendSync(exchange);
+
+ // Loop and wait for at least one attempt to process the message
+ for (int i = 0; i < 5; i++) {
+ Thread.sleep(1000);
+ if (receiveCount[0] > 0) {
+ break;
+ }
+ }
+
+ assertTrue("The message was never processed by servicemix-jms", receiveCount[0] > 0);
+
+ // Deactivate the JMS component so that it stops
+ // trying to get the message from the queue
+ container.deactivateComponent("servicemix-jms");
+
+ JmsTemplate template = new JmsTemplate(connFactory);
+ template.setReceiveTimeout(2000);
+ assertNotNull("Message should still be on the queue", template.receive("destination"));
+ }
+
+
+ private JmsConsumerEndpoint createInOnlyConsumerEndpoint(ConnectionFactory connFactory,
+ boolean rollbackOnError) throws URISyntaxException {
+ JmsConsumerEndpoint endpoint = new JmsConsumerEndpoint();
+ endpoint.setService(new QName("http://jms.servicemix.org/Test", "Consumer"));
+ endpoint.setEndpoint("endpoint");
+ DefaultConsumerMarshaler marshaler = new DefaultConsumerMarshaler();
+ marshaler.setMep(new URI("http://www.w3.org/2004/08/wsdl/in-only"));
+ marshaler.setRollbackOnError(rollbackOnError);
+ endpoint.setMarshaler(marshaler);
+ endpoint.setListenerType("simple");
+ endpoint.setConnectionFactory(connFactory);
+ endpoint.setDestinationName("destination");
+ endpoint.setTargetService(new QName("http://jms.servicemix.org/Test", "Echo"));
+ return endpoint;
+ }
+
private JmsConsumerEndpoint createConsumerEndpoint(ConnectionFactory connFactory) throws URISyntaxException {
JmsConsumerEndpoint endpoint = new JmsConsumerEndpoint();
endpoint.setService(new QName("http://jms.servicemix.org/Test", "Consumer"));
@@ -125,7 +192,7 @@
endpoint.setTargetService(new QName("http://jms.servicemix.org/Test", "Echo"));
return endpoint;
}
-
+
private JmsProviderEndpoint createProviderEndpoint(ConnectionFactory connFactory) {
JmsProviderEndpoint endpoint = new JmsProviderEndpoint();
endpoint.setService(new QName("http://jms.servicemix.org/Test", "Provider"));