You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/30 11:00:58 UTC
svn commit: r959237 - in /camel/trunk/components:
camel-jms/src/test/java/org/apache/camel/component/jms/tx/
camel-jms/src/test/resources/org/apache/camel/component/jms/tx/
camel-spring/src/main/java/org/apache/camel/spring/spi/
Author: davsclaus
Date: Wed Jun 30 09:00:57 2010
New Revision: 959237
URL: http://svn.apache.org/viewvc?rev=959237&view=rev
Log:
CAMEL-2877: Transacted error handler to support async routing engine.
Added:
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java (with props)
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java (with props)
camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JmsTransacted-context.xml
- copied, changed from r959215, camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/TransactionMinimalConfigurationTest.xml
Modified:
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java?rev=959237&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java Wed Jun 30 09:00:57 2010
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.tx;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.async.MyAsyncComponent;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointJmsTXRollbackTest extends CamelSpringTestSupport {
+
+ private static int invoked;
+
+ @Override
+ protected int getExpectedRouteCount() {
+ // no routes in Spring XML so return 0
+ return 0;
+ }
+
+ @Override
+ protected AbstractXmlApplicationContext createApplicationContext() {
+ return new ClassPathXmlApplicationContext("org/apache/camel/component/jms/tx/JmsTransacted-context.xml");
+ }
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+
+ @Test
+ public void testAsyncEndpointRollback() throws Exception {
+ invoked = 0;
+
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel", "Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel", "Bye Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+ template.sendBody("activemq:queue:inbox", "Hello Camel");
+
+ assertMockEndpointsSatisfied();
+
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ from("activemq:queue:inbox")
+ .transacted()
+ .to("mock:before")
+ .to("log:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ beforeThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("async:Bye Camel")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ afterThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("log:after")
+ .to("mock:after")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ invoked++;
+ if (invoked < 2) {
+ throw new IllegalArgumentException("Damn");
+ }
+ }
+ })
+ .to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java?rev=959237&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java Wed Jun 30 09:00:57 2010
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.tx;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.async.MyAsyncComponent;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointJmsTXTest extends CamelSpringTestSupport {
+
+ @Override
+ protected int getExpectedRouteCount() {
+ // no routes in Spring XML so return 0
+ return 0;
+ }
+
+ @Override
+ protected AbstractXmlApplicationContext createApplicationContext() {
+ return new ClassPathXmlApplicationContext("org/apache/camel/component/jms/tx/JmsTransacted-context.xml");
+ }
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+
+ @Test
+ public void testAsyncEndpointOK() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+ template.sendBody("activemq:queue:inbox", "Hello Camel");
+
+ assertMockEndpointsSatisfied();
+
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ from("activemq:queue:inbox")
+ .transacted()
+ .to("mock:before")
+ .to("log:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ beforeThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("async:Bye Camel")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ afterThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("log:after")
+ .to("mock:after")
+ .to("mock:result");
+ }
+ };
+ }
+}
Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JmsTransacted-context.xml (from r959215, camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/TransactionMinimalConfigurationTest.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JmsTransacted-context.xml?p2=camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JmsTransacted-context.xml&p1=camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/TransactionMinimalConfigurationTest.xml&r1=959215&r2=959237&rev=959237&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/TransactionMinimalConfigurationTest.xml (original)
+++ camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JmsTransacted-context.xml Wed Jun 30 09:00:57 2010
@@ -22,45 +22,18 @@
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
- <!-- START SNIPPET: e1 -->
- <!-- setup JMS connection factory -->
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost?broker.persistent=false&broker.useJmx=false"/>
</bean>
- <!-- setup spring jms TX manager -->
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
- <!-- define our activemq component -->
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
- <!-- define the jms consumer/producer as transacted -->
<property name="transacted" value="true"/>
- <!-- setup the transaction manager to use -->
- <!-- if not provided then Camel will automatic use a JmsTransactionManager, however if you
- for instance use a JTA transaction manager then you must configure it -->
<property name="transactionManager" ref="jmsTransactionManager"/>
</bean>
- <!-- END SNIPPET: e1 -->
-
- <!-- START SNIPPET: e2 -->
- <camelContext xmlns="http://camel.apache.org/schema/spring">
- <route>
- <!-- 1: from the jms queue -->
- <from uri="activemq:queue:okay"/>
- <!-- 2: mark this route as transacted -->
- <transacted/>
- <!-- 3: call our business logic that is myProcessor -->
- <process ref="myProcessor"/>
- <!-- 4: if success then send it to the mock -->
- <to uri="mock:result"/>
- </route>
- </camelContext>
-
- <bean id="myProcessor" class="org.apache.camel.component.jms.tx.JMSTransactionalClientTest$MyProcessor"/>
- <!-- END SNIPPET: e2 -->
-
</beans>
Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java?rev=959237&r1=959236&r2=959237&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java Wed Jun 30 09:00:57 2010
@@ -16,6 +16,8 @@
*/
package org.apache.camel.spring.spi;
+import java.util.concurrent.CountDownLatch;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
@@ -159,14 +161,43 @@ public class TransactionErrorHandler ext
exchange.setProperty(Exchange.TRANSACTED, Boolean.TRUE);
// and now let process the exchange
- try {
- TransactionErrorHandler.super.process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- // noop
+ // we have to wait if the async routing engine took over, because transactions have to be done in
+ // the same thread (Spring TransactionManager) so by waiting until the async routing is done
+ // will let us be able to continue routing thereafter in the same thread context
+ final CountDownLatch latch = new CountDownLatch(1);
+ boolean sync = TransactionErrorHandler.super.process(exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ if (!doneSync) {
+ if (log.isTraceEnabled()) {
+ log.trace("Asynchronous callback received for exchangeId: " + exchange.getExchangeId());
+ }
+ latch.countDown();
}
- });
- } catch (Exception e) {
- exchange.setException(e);
+ }
+
+ @Override
+ public String toString() {
+ return "Done TransactionErrorHandler";
+ }
+ });
+ if (!sync) {
+ if (log.isTraceEnabled()) {
+ log.trace("Waiting for asynchronous callback before continuing for exchangeId: " + exchange.getExchangeId() + " -> " + exchange);
+ }
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId(), e);
+ }
+ // we may be shutting down etc., so set exception
+ if (exchange.getException() == null) {
+ exchange.setException(e);
+ }
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("Asynchronous callback received, will continue routing exchangeId: " + exchange.getExchangeId() + " -> " + exchange);
+ }
}
// after handling and still an exception or marked as rollback only then rollback