You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ff...@apache.org on 2012/11/26 05:32:18 UTC
svn commit: r1413478 -
/servicemix/components/trunk/shared-libraries/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
Author: ffang
Date: Mon Nov 26 04:32:18 2012
New Revision: 1413478
URL: http://svn.apache.org/viewvc?rev=1413478&view=rev
Log:
[SMXCOMP-956]AsyncBaseLifeCycle should be able to return Error MessageExchange if provider threadpool is run out
Modified:
servicemix/components/trunk/shared-libraries/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
Modified: servicemix/components/trunk/shared-libraries/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
URL: http://svn.apache.org/viewvc/servicemix/components/trunk/shared-libraries/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java?rev=1413478&r1=1413477&r2=1413478&view=diff
==============================================================================
--- servicemix/components/trunk/shared-libraries/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java (original)
+++ servicemix/components/trunk/shared-libraries/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java Mon Nov 26 04:32:18 2012
@@ -336,10 +336,15 @@ public class AsyncBaseLifeCycle implemen
polling.notify();
}
Executor executor = null;
+ ExchangeStatus oldStatus = null;
+ MessageExchange newExchange = null;
while (running.get()) {
try {
final MessageExchange exchange = channel.accept(1000L);
+
if (exchange != null) {
+ newExchange = exchange;
+ oldStatus = exchange.getStatus();
executor = exchange.getRole().equals(Role.CONSUMER) ? consumerExecutor : providerExecutor;
final Transaction tx = (Transaction) exchange
.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
@@ -367,6 +372,51 @@ public class AsyncBaseLifeCycle implemen
} else {
logger.error("Error polling delivery channel", t);
}
+ try {
+ // If we are transacted, check if this exception should
+ // rollback the transaction
+ if (transactionManager != null && transactionManager.getStatus() == Status.STATUS_ACTIVE) {
+ if (exceptionShouldRollbackTx(t)) {
+ transactionManager.setRollbackOnly();
+ }
+ if (!container.handleTransactions()) {
+ transactionManager.suspend();
+ }
+ }
+ if (oldStatus == ExchangeStatus.ACTIVE) {
+ newExchange.setStatus(ExchangeStatus.ERROR);
+ newExchange.setError(t instanceof Exception ? (Exception) t : new Exception(t));
+ channel.send(newExchange);
+ }
+ } catch (Exception inner) {
+ logger.error("Error setting exchange status to ERROR", inner);
+ }
+ } finally {
+ try {
+ // Check transaction status
+ if (newExchange != null) {
+ Transaction tx = (Transaction)newExchange
+ .getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
+ if (tx != null) {
+ int status = transactionManager.getStatus();
+ // We use pull delivery, so the transaction should already
+ // have been transfered to another thread because the
+ // component
+ // must have answered.
+ if (status != Status.STATUS_NO_TRANSACTION) {
+ logger
+ .error("Transaction is still active after exchange processing. Trying to rollback transaction.");
+ try {
+ transactionManager.rollback();
+ } catch (Throwable t) {
+ logger.error("Error trying to rollback transaction.", t);
+ }
+ }
+ }
+ }
+ } catch (Throwable t) {
+ logger.error("Error checking transaction status.", t);
+ }
}
}
synchronized (polling) {