You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/30 14:25:55 UTC

svn commit: r959280 - in /camel/trunk/components: camel-jms/src/test/java/org/apache/camel/component/jms/tx/ camel-spring/src/main/java/org/apache/camel/spring/spi/

Author: davsclaus
Date: Wed Jun 30 12:25:55 2010
New Revision: 959280

URL: http://svn.apache.org/viewvc?rev=959280&view=rev
Log:
CAMEL-2877: transacted works with async routing engine.

Added:
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTX2Test.java
      - copied, changed from r959261, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollback2Test.java
      - copied, changed from r959261, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java
Modified:
    camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java

Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTX2Test.java (from r959261, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTX2Test.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTX2Test.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java&r1=959261&r2=959280&rev=959280&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTX2Test.java Wed Jun 30 12:25:55 2010
@@ -28,7 +28,7 @@ import org.springframework.context.suppo
 /**
  * @version $Revision$
  */
-public class AsyncEndpointJmsTXTest extends CamelSpringTestSupport {
+public class AsyncEndpointJmsTX2Test extends CamelSpringTestSupport {
 
     @Override
     protected int getExpectedRouteCount() {
@@ -47,7 +47,7 @@ public class AsyncEndpointJmsTXTest exte
     @Test
     public void testAsyncEndpointOK() throws Exception {
         getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
-        getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+        getMockEndpoint("mock:after").expectedBodiesReceived("Hi Camel");
         getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
 
         template.sendBody("activemq:queue:inbox", "Hello Camel");
@@ -73,7 +73,7 @@ public class AsyncEndpointJmsTXTest exte
                                 beforeThreadName = Thread.currentThread().getName();
                             }
                         })
-                        .to("async:Bye Camel")
+                        .to("async:Hi Camel")
                         .process(new Processor() {
                             public void process(Exchange exchange) throws Exception {
                                 afterThreadName = Thread.currentThread().getName();
@@ -81,8 +81,14 @@ public class AsyncEndpointJmsTXTest exte
                         })
                         .to("log:after")
                         .to("mock:after")
+                        .to("direct:foo")
+                        .to("log:result")
                         .to("mock:result");
+
+                from("direct:foo")
+                    .transacted()
+                    .to("async:Bye Camel");
             }
         };
     }
-}
+}
\ No newline at end of file

Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollback2Test.java (from r959261, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollback2Test.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollback2Test.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java&r1=959261&r2=959280&rev=959280&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollback2Test.java Wed Jun 30 12:25:55 2010
@@ -28,7 +28,7 @@ import org.springframework.context.suppo
 /**
  * @version $Revision$
  */
-public class AsyncEndpointJmsTXRollbackTest extends CamelSpringTestSupport {
+public class AsyncEndpointJmsTXRollback2Test extends CamelSpringTestSupport {
 
     private static int invoked;
 
@@ -51,7 +51,7 @@ public class AsyncEndpointJmsTXRollbackT
         invoked = 0;
 
         getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel", "Hello Camel");
-        getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel", "Bye Camel");
+        getMockEndpoint("mock:after").expectedBodiesReceived("Hi Camel", "Hi Camel");
         getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
 
         template.sendBody("activemq:queue:inbox", "Hello Camel");
@@ -77,7 +77,7 @@ public class AsyncEndpointJmsTXRollbackT
                                 beforeThreadName = Thread.currentThread().getName();
                             }
                         })
-                        .to("async:Bye Camel")
+                        .to("async:Hi Camel")
                         .process(new Processor() {
                             public void process(Exchange exchange) throws Exception {
                                 afterThreadName = Thread.currentThread().getName();
@@ -85,15 +85,22 @@ public class AsyncEndpointJmsTXRollbackT
                         })
                         .to("log:after")
                         .to("mock:after")
-                        .process(new Processor() {
-                            public void process(Exchange exchange) throws Exception {
-                                invoked++;
-                                if (invoked < 2) {
-                                    throw new IllegalArgumentException("Damn");
-                                }
-                            }
-                        })
+                        .to("direct:foo")
                         .to("mock:result");
+
+                from("direct:foo")
+                    .transacted()
+                    .to("async:Bye Camel")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            invoked++;
+                            if (invoked < 2) {
+                                throw new IllegalArgumentException("Damn");
+                            }
+                        }
+                    });
+
+
             }
         };
     }

Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java?rev=959280&r1=959279&r2=959280&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java Wed Jun 30 12:25:55 2010
@@ -23,7 +23,6 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
-import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.processor.Logger;
 import org.apache.camel.processor.RedeliveryErrorHandler;
 import org.apache.camel.processor.RedeliveryPolicy;
@@ -85,10 +84,11 @@ public class TransactionErrorHandler ext
         // using multiple threads to span a transaction
         if (exchange.getUnitOfWork().isTransactedBy(transactionTemplate)) {
             // already transacted by this transaction template
-            // so lets just let the regular default error handler process it
-            processByRegularErrorHandler(exchange);
+            // so lets just let the error handler process it
+            processByErrorHandler(exchange);
         } else {
             // not yet wrapped in transaction so lets do that
+            // and then have it invoke the error handler from within that transaction
             processInTransaction(exchange);
         }
     }
@@ -108,16 +108,6 @@ public class TransactionErrorHandler ext
         return true;
     }
 
-    protected void processByRegularErrorHandler(Exchange exchange) throws Exception {
-        // must invoke the async method and provide an empty callback
-        // to have it process by the error handler (because we invoke super)
-        super.process(exchange, new AsyncCallback() {
-            public void done(boolean doneSync) {
-                // noop
-            }
-        });
-    }
-
     protected void processInTransaction(final Exchange exchange) throws Exception {
         String id = ObjectHelper.getIdentityHashCode(transactionTemplate);
         try {
@@ -147,6 +137,7 @@ public class TransactionErrorHandler ext
         }
     }
 
+
     protected void doInTransactionTemplate(final Exchange exchange) {
 
         // spring transaction template is working best with rollback if you throw it a runtime exception
@@ -160,45 +151,8 @@ public class TransactionErrorHandler ext
 
                 exchange.setProperty(Exchange.TRANSACTED, Boolean.TRUE);
 
-                // and now let process the exchange
-                // we have to wait if the async routing engine took over, because transactions have to be done in
-                // the same thread (Spring TransactionManager) so by waiting until the async routing is done
-                // will let us be able to continue routing thereafter in the same thread context
-                final CountDownLatch latch = new CountDownLatch(1);
-                boolean sync = TransactionErrorHandler.super.process(exchange, new AsyncCallback() {
-                    public void done(boolean doneSync) {
-                        if (!doneSync) {
-                            if (log.isTraceEnabled()) {
-                                log.trace("Asynchronous callback received for exchangeId: " + exchange.getExchangeId());
-                            }
-                            latch.countDown();
-                        }
-                    }
-
-                    @Override
-                    public String toString() {
-                        return "Done TransactionErrorHandler";
-                    }
-                });
-                if (!sync) {
-                    if (log.isTraceEnabled()) {
-                        log.trace("Waiting for asynchronous callback before continuing for exchangeId: " + exchange.getExchangeId() + " -> " + exchange);
-                    }
-                    try {
-                        latch.await();
-                    } catch (InterruptedException e) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId(), e);
-                        }
-                        // we may be shutting down etc., so set exception
-                        if (exchange.getException() == null) {
-                            exchange.setException(e);
-                        }
-                    }
-                    if (log.isTraceEnabled()) {
-                        log.trace("Asynchronous callback received, will continue routing exchangeId: " + exchange.getExchangeId() + " -> " + exchange);
-                    }
-                }
+                // and now let process the exchange by the error handler
+                processByErrorHandler(exchange);
 
                 // after handling and still an exception or marked as rollback only then rollback
                 if (exchange.getException() != null || exchange.isRollbackOnly()) {
@@ -228,7 +182,53 @@ public class TransactionErrorHandler ext
         });
     }
 
-    protected String propagationBehaviorToString(int propagationBehavior) {
+    /**
+     * Processes the {@link Exchange} using the error handler.
+     * <p/>
+     * This implementation will invoke ensure this occurs synchronously, that means if the async routing engine
+     * did kick in, then this implementation will wait for the task to complete before it continues.
+     *
+     * @param exchange the exchange
+     */
+    protected void processByErrorHandler(final Exchange exchange) {
+        final CountDownLatch latch = new CountDownLatch(1);
+        boolean sync = super.process(exchange, new AsyncCallback() {
+            public void done(boolean doneSync) {
+                if (!doneSync) {
+                    if (log.isTraceEnabled()) {
+                        log.trace("Asynchronous callback received for exchangeId: " + exchange.getExchangeId());
+                    }
+                    latch.countDown();
+                }
+            }
+
+            @Override
+            public String toString() {
+                return "Done TransactionErrorHandler";
+            }
+        });
+        if (!sync) {
+            if (log.isTraceEnabled()) {
+                log.trace("Waiting for asynchronous callback before continuing for exchangeId: " + exchange.getExchangeId() + " -> " + exchange);
+            }
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId(), e);
+                }
+                // we may be shutting down etc., so set exception
+                if (exchange.getException() == null) {
+                    exchange.setException(e);
+                }
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("Asynchronous callback received, will continue routing exchangeId: " + exchange.getExchangeId() + " -> " + exchange);
+            }
+        }
+    }
+
+    private static String propagationBehaviorToString(int propagationBehavior) {
         String rc;
         switch (propagationBehavior) {
         case TransactionDefinition.PROPAGATION_MANDATORY: