You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/30 14:25:55 UTC
svn commit: r959280 - in /camel/trunk/components:
camel-jms/src/test/java/org/apache/camel/component/jms/tx/
camel-spring/src/main/java/org/apache/camel/spring/spi/
Author: davsclaus
Date: Wed Jun 30 12:25:55 2010
New Revision: 959280
URL: http://svn.apache.org/viewvc?rev=959280&view=rev
Log:
CAMEL-2877: transacted works with async routing engine.
Added:
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTX2Test.java
- copied, changed from r959261, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollback2Test.java
- copied, changed from r959261, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java
Modified:
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTX2Test.java (from r959261, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTX2Test.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTX2Test.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java&r1=959261&r2=959280&rev=959280&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTX2Test.java Wed Jun 30 12:25:55 2010
@@ -28,7 +28,7 @@ import org.springframework.context.suppo
/**
* @version $Revision$
*/
-public class AsyncEndpointJmsTXTest extends CamelSpringTestSupport {
+public class AsyncEndpointJmsTX2Test extends CamelSpringTestSupport {
@Override
protected int getExpectedRouteCount() {
@@ -47,7 +47,7 @@ public class AsyncEndpointJmsTXTest exte
@Test
public void testAsyncEndpointOK() throws Exception {
getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
- getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Hi Camel");
getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
template.sendBody("activemq:queue:inbox", "Hello Camel");
@@ -73,7 +73,7 @@ public class AsyncEndpointJmsTXTest exte
beforeThreadName = Thread.currentThread().getName();
}
})
- .to("async:Bye Camel")
+ .to("async:Hi Camel")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
afterThreadName = Thread.currentThread().getName();
@@ -81,8 +81,14 @@ public class AsyncEndpointJmsTXTest exte
})
.to("log:after")
.to("mock:after")
+ .to("direct:foo")
+ .to("log:result")
.to("mock:result");
+
+ from("direct:foo")
+ .transacted()
+ .to("async:Bye Camel");
}
};
}
-}
+}
\ No newline at end of file
Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollback2Test.java (from r959261, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollback2Test.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollback2Test.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java&r1=959261&r2=959280&rev=959280&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollbackTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AsyncEndpointJmsTXRollback2Test.java Wed Jun 30 12:25:55 2010
@@ -28,7 +28,7 @@ import org.springframework.context.suppo
/**
* @version $Revision$
*/
-public class AsyncEndpointJmsTXRollbackTest extends CamelSpringTestSupport {
+public class AsyncEndpointJmsTXRollback2Test extends CamelSpringTestSupport {
private static int invoked;
@@ -51,7 +51,7 @@ public class AsyncEndpointJmsTXRollbackT
invoked = 0;
getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel", "Hello Camel");
- getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel", "Bye Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Hi Camel", "Hi Camel");
getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
template.sendBody("activemq:queue:inbox", "Hello Camel");
@@ -77,7 +77,7 @@ public class AsyncEndpointJmsTXRollbackT
beforeThreadName = Thread.currentThread().getName();
}
})
- .to("async:Bye Camel")
+ .to("async:Hi Camel")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
afterThreadName = Thread.currentThread().getName();
@@ -85,15 +85,22 @@ public class AsyncEndpointJmsTXRollbackT
})
.to("log:after")
.to("mock:after")
- .process(new Processor() {
- public void process(Exchange exchange) throws Exception {
- invoked++;
- if (invoked < 2) {
- throw new IllegalArgumentException("Damn");
- }
- }
- })
+ .to("direct:foo")
.to("mock:result");
+
+ from("direct:foo")
+ .transacted()
+ .to("async:Bye Camel")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ invoked++;
+ if (invoked < 2) {
+ throw new IllegalArgumentException("Damn");
+ }
+ }
+ });
+
+
}
};
}
Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java?rev=959280&r1=959279&r2=959280&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java Wed Jun 30 12:25:55 2010
@@ -23,7 +23,6 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
-import org.apache.camel.RuntimeCamelException;
import org.apache.camel.processor.Logger;
import org.apache.camel.processor.RedeliveryErrorHandler;
import org.apache.camel.processor.RedeliveryPolicy;
@@ -85,10 +84,11 @@ public class TransactionErrorHandler ext
// using multiple threads to span a transaction
if (exchange.getUnitOfWork().isTransactedBy(transactionTemplate)) {
// already transacted by this transaction template
- // so lets just let the regular default error handler process it
- processByRegularErrorHandler(exchange);
+ // so lets just let the error handler process it
+ processByErrorHandler(exchange);
} else {
// not yet wrapped in transaction so lets do that
+ // and then have it invoke the error handler from within that transaction
processInTransaction(exchange);
}
}
@@ -108,16 +108,6 @@ public class TransactionErrorHandler ext
return true;
}
- protected void processByRegularErrorHandler(Exchange exchange) throws Exception {
- // must invoke the async method and provide an empty callback
- // to have it process by the error handler (because we invoke super)
- super.process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- // noop
- }
- });
- }
-
protected void processInTransaction(final Exchange exchange) throws Exception {
String id = ObjectHelper.getIdentityHashCode(transactionTemplate);
try {
@@ -147,6 +137,7 @@ public class TransactionErrorHandler ext
}
}
+
protected void doInTransactionTemplate(final Exchange exchange) {
// spring transaction template is working best with rollback if you throw it a runtime exception
@@ -160,45 +151,8 @@ public class TransactionErrorHandler ext
exchange.setProperty(Exchange.TRANSACTED, Boolean.TRUE);
- // and now let process the exchange
- // we have to wait if the async routing engine took over, because transactions have to be done in
- // the same thread (Spring TransactionManager) so by waiting until the async routing is done
- // will let us be able to continue routing thereafter in the same thread context
- final CountDownLatch latch = new CountDownLatch(1);
- boolean sync = TransactionErrorHandler.super.process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- if (!doneSync) {
- if (log.isTraceEnabled()) {
- log.trace("Asynchronous callback received for exchangeId: " + exchange.getExchangeId());
- }
- latch.countDown();
- }
- }
-
- @Override
- public String toString() {
- return "Done TransactionErrorHandler";
- }
- });
- if (!sync) {
- if (log.isTraceEnabled()) {
- log.trace("Waiting for asynchronous callback before continuing for exchangeId: " + exchange.getExchangeId() + " -> " + exchange);
- }
- try {
- latch.await();
- } catch (InterruptedException e) {
- if (log.isDebugEnabled()) {
- log.debug("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId(), e);
- }
- // we may be shutting down etc., so set exception
- if (exchange.getException() == null) {
- exchange.setException(e);
- }
- }
- if (log.isTraceEnabled()) {
- log.trace("Asynchronous callback received, will continue routing exchangeId: " + exchange.getExchangeId() + " -> " + exchange);
- }
- }
+ // and now let process the exchange by the error handler
+ processByErrorHandler(exchange);
// after handling and still an exception or marked as rollback only then rollback
if (exchange.getException() != null || exchange.isRollbackOnly()) {
@@ -228,7 +182,53 @@ public class TransactionErrorHandler ext
});
}
- protected String propagationBehaviorToString(int propagationBehavior) {
+ /**
+ * Processes the {@link Exchange} using the error handler.
+ * <p/>
+ * This implementation will invoke ensure this occurs synchronously, that means if the async routing engine
+ * did kick in, then this implementation will wait for the task to complete before it continues.
+ *
+ * @param exchange the exchange
+ */
+ protected void processByErrorHandler(final Exchange exchange) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ boolean sync = super.process(exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ if (!doneSync) {
+ if (log.isTraceEnabled()) {
+ log.trace("Asynchronous callback received for exchangeId: " + exchange.getExchangeId());
+ }
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Done TransactionErrorHandler";
+ }
+ });
+ if (!sync) {
+ if (log.isTraceEnabled()) {
+ log.trace("Waiting for asynchronous callback before continuing for exchangeId: " + exchange.getExchangeId() + " -> " + exchange);
+ }
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId(), e);
+ }
+ // we may be shutting down etc., so set exception
+ if (exchange.getException() == null) {
+ exchange.setException(e);
+ }
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("Asynchronous callback received, will continue routing exchangeId: " + exchange.getExchangeId() + " -> " + exchange);
+ }
+ }
+ }
+
+ private static String propagationBehaviorToString(int propagationBehavior) {
String rc;
switch (propagationBehavior) {
case TransactionDefinition.PROPAGATION_MANDATORY: