You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ff...@apache.org on 2012/11/26 05:32:18 UTC

svn commit: r1413478 - /servicemix/components/trunk/shared-libraries/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java

Author: ffang
Date: Mon Nov 26 04:32:18 2012
New Revision: 1413478

URL: http://svn.apache.org/viewvc?rev=1413478&view=rev
Log:
[SMXCOMP-956]AsyncBaseLifeCycle should be able to return Error MessageExchange if provider threadpool is run out 

Modified:
    servicemix/components/trunk/shared-libraries/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java

Modified: servicemix/components/trunk/shared-libraries/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
URL: http://svn.apache.org/viewvc/servicemix/components/trunk/shared-libraries/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java?rev=1413478&r1=1413477&r2=1413478&view=diff
==============================================================================
--- servicemix/components/trunk/shared-libraries/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java (original)
+++ servicemix/components/trunk/shared-libraries/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java Mon Nov 26 04:32:18 2012
@@ -336,10 +336,15 @@ public class AsyncBaseLifeCycle implemen
             polling.notify();
         }
         Executor executor = null;
+        ExchangeStatus oldStatus = null;
+        MessageExchange newExchange = null;
         while (running.get()) {
             try {
                 final MessageExchange exchange = channel.accept(1000L);
+                
                 if (exchange != null) {
+                    newExchange = exchange;
+                    oldStatus = exchange.getStatus();
                     executor = exchange.getRole().equals(Role.CONSUMER) ? consumerExecutor : providerExecutor;
                     final Transaction tx = (Transaction) exchange
                             .getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
@@ -367,6 +372,51 @@ public class AsyncBaseLifeCycle implemen
                 } else {
                     logger.error("Error polling delivery channel", t);
                 }
+                try {
+                    // If we are transacted, check if this exception should
+                    // rollback the transaction
+                    if (transactionManager != null && transactionManager.getStatus() == Status.STATUS_ACTIVE) {
+                        if (exceptionShouldRollbackTx(t)) {
+                            transactionManager.setRollbackOnly();
+                        }
+                        if (!container.handleTransactions()) {
+                            transactionManager.suspend();
+                        }
+                    }
+                    if (oldStatus == ExchangeStatus.ACTIVE) {
+                            newExchange.setStatus(ExchangeStatus.ERROR);
+                            newExchange.setError(t instanceof Exception ? (Exception) t : new Exception(t));
+                        channel.send(newExchange);
+                    }
+                } catch (Exception inner) {
+                    logger.error("Error setting exchange status to ERROR", inner);
+                }
+            } finally {
+                try {
+                    // Check transaction status
+                    if (newExchange != null) {
+                        Transaction tx = (Transaction)newExchange
+                            .getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
+                        if (tx != null) {
+                            int status = transactionManager.getStatus();
+                            // We use pull delivery, so the transaction should already
+                            // have been transfered to another thread because the
+                            // component
+                            // must have answered.
+                            if (status != Status.STATUS_NO_TRANSACTION) {
+                                logger
+                                    .error("Transaction is still active after exchange processing. Trying to rollback transaction.");
+                                try {
+                                    transactionManager.rollback();
+                                } catch (Throwable t) {
+                                    logger.error("Error trying to rollback transaction.", t);
+                                }
+                            }
+                        }
+                    }
+                } catch (Throwable t) {
+                    logger.error("Error checking transaction status.", t);
+                }
             }
         }
         synchronized (polling) {