You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/11/24 09:23:59 UTC
svn commit: r478801 - in
/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms:
JmsEndpoint.java jca/JcaConsumerProcessor.java
Author: gnodet
Date: Fri Nov 24 00:23:58 2006
New Revision: 478801
URL: http://svn.apache.org/viewvc?view=rev&rev=478801
Log:
SM-756: The jms/jca consumer endpoint should be able to rollback transactions when an exchange with an ERROR status comes back
Modified:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaConsumerProcessor.java
Modified: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java?view=diff&rev=478801&r1=478800&r2=478801
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java (original)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java Fri Nov 24 00:23:58 2006
@@ -71,6 +71,8 @@
protected ActivationSpec activationSpec;
protected BootstrapContext bootstrapContext;
protected boolean synchronous;
+ protected boolean rollbackOnError;
+
// Other configuration flags
protected boolean needJavaIdentifiers;
@@ -106,6 +108,20 @@
*/
public void setSynchronous(boolean synchronous) {
this.synchronous = synchronous;
+ }
+
+ /**
+ * @return the rollbackOnError
+ */
+ public boolean isRollbackOnError() {
+ return rollbackOnError;
+ }
+
+ /**
+ * @param rollbackOnError the rollbackOnError to set
+ */
+ public void setRollbackOnError(boolean rollbackOnError) {
+ this.rollbackOnError = rollbackOnError;
}
/**
Modified: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaConsumerProcessor.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaConsumerProcessor.java?view=diff&rev=478801&r1=478800&r2=478801
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaConsumerProcessor.java (original)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaConsumerProcessor.java Fri Nov 24 00:23:58 2006
@@ -133,18 +133,43 @@
try {
if (exchange.getStatus() == ExchangeStatus.DONE) {
return;
- }
- connection = connectionFactory.createConnection();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- response = fromNMSResponse(exchange, context, session);
- if (response != null) {
- MessageProducer producer = session.createProducer(message.getJMSReplyTo());
- if (endpoint.isUseMsgIdInResponse()) {
- response.setJMSCorrelationID(message.getJMSMessageID());
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ if (endpoint.isRollbackOnError()) {
+ TransactionManager tm = (TransactionManager) endpoint.getServiceUnit().getComponent().getComponentContext().getTransactionManager();
+ tm.setRollbackOnly();
+ return;
+ } else if (exchange instanceof InOnly) {
+ log.info("Exchange in error: " + exchange, exchange.getError());
+ return;
} else {
- response.setJMSCorrelationID(message.getJMSCorrelationID());
+ connection = connectionFactory.createConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Exception error = exchange.getError();
+ if (error == null) {
+ error = new Exception("Exchange in error");
+ }
+ response = session.createObjectMessage(error);
+ MessageProducer producer = session.createProducer(message.getJMSReplyTo());
+ if (endpoint.isUseMsgIdInResponse()) {
+ response.setJMSCorrelationID(message.getJMSMessageID());
+ } else {
+ response.setJMSCorrelationID(message.getJMSCorrelationID());
+ }
+ producer.send(response);
+ }
+ } else {
+ connection = connectionFactory.createConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ response = fromNMSResponse(exchange, context, session);
+ if (response != null) {
+ MessageProducer producer = session.createProducer(message.getJMSReplyTo());
+ if (endpoint.isUseMsgIdInResponse()) {
+ response.setJMSCorrelationID(message.getJMSMessageID());
+ } else {
+ response.setJMSCorrelationID(message.getJMSCorrelationID());
+ }
+ producer.send(response);
}
- producer.send(response);
}
} finally {
if (connection != null) {