You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/30 11:00:58 UTC

svn commit: r959237 - in /camel/trunk/components: camel-jms/src/test/java/org/apache/camel/component/jms/tx/ camel-jms/src/test/resources/org/apache/camel/component/jms/tx/ camel-spring/src/main/java/org/apache/camel/spring/spi/

Author: davsclaus
Date: Wed Jun 30 09:00:57 2010
New Revision: 959237

URL: http://svn.apache.org/viewvc?rev=959237&view=rev
Log:
CAMEL-2877: Transacted error handler to support async routing engine.

Added:
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java   (with props)
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java   (with props)
    camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JmsTransacted-context.xml
      - copied, changed from r959215, camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/TransactionMinimalConfigurationTest.xml
Modified:
    camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java

Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java?rev=959237&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java Wed Jun 30 09:00:57 2010
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.tx;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.async.MyAsyncComponent;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointJmsTXRollbackTest extends CamelSpringTestSupport {
+
+    private static int invoked;
+
+    @Override
+    protected int getExpectedRouteCount() {
+        // no routes in Spring XML so return 0
+        return 0;
+    }
+
+    @Override
+    protected AbstractXmlApplicationContext createApplicationContext() {
+        return new ClassPathXmlApplicationContext("org/apache/camel/component/jms/tx/JmsTransacted-context.xml");
+    }
+
+    private static String beforeThreadName;
+    private static String afterThreadName;
+
+    @Test
+    public void testAsyncEndpointRollback() throws Exception {
+        invoked = 0;
+
+        getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel", "Hello Camel");
+        getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel", "Bye Camel");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+        template.sendBody("activemq:queue:inbox", "Hello Camel");
+
+        assertMockEndpointsSatisfied();
+
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.addComponent("async", new MyAsyncComponent());
+
+                from("activemq:queue:inbox")
+                    .transacted()
+                        .to("mock:before")
+                        .to("log:before")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                beforeThreadName = Thread.currentThread().getName();
+                            }
+                        })
+                        .to("async:Bye Camel")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                afterThreadName = Thread.currentThread().getName();
+                            }
+                        })
+                        .to("log:after")
+                        .to("mock:after")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                invoked++;
+                                if (invoked < 2) {
+                                    throw new IllegalArgumentException("Damn");
+                                }
+                            }
+                        })
+                        .to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java?rev=959237&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java Wed Jun 30 09:00:57 2010
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.tx;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.async.MyAsyncComponent;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointJmsTXTest extends CamelSpringTestSupport {
+
+    @Override
+    protected int getExpectedRouteCount() {
+        // no routes in Spring XML so return 0
+        return 0;
+    }
+
+    @Override
+    protected AbstractXmlApplicationContext createApplicationContext() {
+        return new ClassPathXmlApplicationContext("org/apache/camel/component/jms/tx/JmsTransacted-context.xml");
+    }
+
+    private static String beforeThreadName;
+    private static String afterThreadName;
+
+    @Test
+    public void testAsyncEndpointOK() throws Exception {
+        getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+        getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+        template.sendBody("activemq:queue:inbox", "Hello Camel");
+
+        assertMockEndpointsSatisfied();
+
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.addComponent("async", new MyAsyncComponent());
+
+                from("activemq:queue:inbox")
+                    .transacted()
+                        .to("mock:before")
+                        .to("log:before")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                beforeThreadName = Thread.currentThread().getName();
+                            }
+                        })
+                        .to("async:Bye Camel")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                afterThreadName = Thread.currentThread().getName();
+                            }
+                        })
+                        .to("log:after")
+                        .to("mock:after")
+                        .to("mock:result");
+            }
+        };
+    }
+}

Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JmsTransacted-context.xml (from r959215, camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/TransactionMinimalConfigurationTest.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JmsTransacted-context.xml?p2=camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JmsTransacted-context.xml&p1=camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/TransactionMinimalConfigurationTest.xml&r1=959215&r2=959237&rev=959237&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/TransactionMinimalConfigurationTest.xml (original)
+++ camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JmsTransacted-context.xml Wed Jun 30 09:00:57 2010
@@ -22,45 +22,18 @@
          http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
          http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
 
-    <!-- START SNIPPET: e1 -->
-    <!-- setup JMS connection factory -->
     <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
         <property name="brokerURL" value="vm://localhost?broker.persistent=false&amp;broker.useJmx=false"/>
     </bean>
 
-    <!-- setup spring jms TX manager -->
     <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
         <property name="connectionFactory" ref="jmsConnectionFactory"/>
     </bean>
 
-    <!-- define our activemq component -->
     <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
         <property name="connectionFactory" ref="jmsConnectionFactory"/>
-        <!-- define the jms consumer/producer as transacted -->
         <property name="transacted" value="true"/>
-        <!-- setup the transaction manager to use -->
-        <!-- if not provided then Camel will automatic use a JmsTransactionManager, however if you
-             for instance use a JTA transaction manager then you must configure it -->
         <property name="transactionManager" ref="jmsTransactionManager"/>
     </bean>
-    <!-- END SNIPPET: e1 -->
-
-    <!-- START SNIPPET: e2 -->
-    <camelContext xmlns="http://camel.apache.org/schema/spring">
-        <route>
-            <!-- 1: from the jms queue -->
-            <from uri="activemq:queue:okay"/>
-            <!-- 2: mark this route as transacted -->
-            <transacted/>
-            <!-- 3: call our business logic that is myProcessor -->
-            <process ref="myProcessor"/>
-            <!-- 4: if success then send it to the mock -->
-            <to uri="mock:result"/>
-        </route>
-    </camelContext>
-
-    <bean id="myProcessor" class="org.apache.camel.component.jms.tx.JMSTransactionalClientTest$MyProcessor"/>
-    <!-- END SNIPPET: e2 -->
-
 
 </beans>

Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java?rev=959237&r1=959236&r2=959237&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java Wed Jun 30 09:00:57 2010
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.spring.spi;
 
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
@@ -159,14 +161,43 @@ public class TransactionErrorHandler ext
                 exchange.setProperty(Exchange.TRANSACTED, Boolean.TRUE);
 
                 // and now let process the exchange
-                try {
-                    TransactionErrorHandler.super.process(exchange, new AsyncCallback() {
-                        public void done(boolean doneSync) {
-                            // noop
+                // we have to wait if the async routing engine took over, because transactions have to be done in
+                // the same thread (Spring TransactionManager) so by waiting until the async routing is done
+                // will let us be able to continue routing thereafter in the same thread context
+                final CountDownLatch latch = new CountDownLatch(1);
+                boolean sync = TransactionErrorHandler.super.process(exchange, new AsyncCallback() {
+                    public void done(boolean doneSync) {
+                        if (!doneSync) {
+                            if (log.isTraceEnabled()) {
+                                log.trace("Asynchronous callback received for exchangeId: " + exchange.getExchangeId());
+                            }
+                            latch.countDown();
                         }
-                    });
-                } catch (Exception e) {
-                    exchange.setException(e);
+                    }
+
+                    @Override
+                    public String toString() {
+                        return "Done TransactionErrorHandler";
+                    }
+                });
+                if (!sync) {
+                    if (log.isTraceEnabled()) {
+                        log.trace("Waiting for asynchronous callback before continuing for exchangeId: " + exchange.getExchangeId() + " -> " + exchange);
+                    }
+                    try {
+                        latch.await();
+                    } catch (InterruptedException e) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId(), e);
+                        }
+                        // we may be shutting down etc., so set exception
+                        if (exchange.getException() == null) {
+                            exchange.setException(e);
+                        }
+                    }
+                    if (log.isTraceEnabled()) {
+                        log.trace("Asynchronous callback received, will continue routing exchangeId: " + exchange.getExchangeId() + " -> " + exchange);
+                    }
                 }
 
                 // after handling and still an exception or marked as rollback only then rollback