You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/06 12:40:38 UTC
svn commit: r1198342 - in /camel/branches/camel-2.8.x: ./
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/processor/
camel-core/src/test/java/org/apache/camel/
Author: davsclaus
Date: Sun Nov 6 11:40:38 2011
New Revision: 1198342
URL: http://svn.apache.org/viewvc?rev=1198342&view=rev
Log:
CAMEL-4625: Exchange property can control if UoWPorcessor process sync or async. Needed by some consumers to have fine grained control over this.
Added:
camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java
- copied, changed from r1198338, camel/trunk/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java
Modified:
camel/branches/camel-2.8.x/ (props changed)
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/Exchange.java
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Nov 6 11:40:38 2011
@@ -1 +1 @@
-/camel/trunk:1186106,1186625,1186772,1187221,1187485,1187882,1187893,1188070-1188085,1188642,1188674,1188879,1188881,1189139,1189600,1189681,1189693,1189737,1190212-1190213,1190246,1190303,1195317,1195616,1196210,1197450,1197933,1197948,1198199
+/camel/trunk:1186106,1186625,1186772,1187221,1187485,1187882,1187893,1188070-1188085,1188642,1188674,1188879,1188881,1189139,1189600,1189681,1189693,1189737,1190212-1190213,1190246,1190303,1195317,1195616,1196210,1197450,1197933,1197948,1198199,1198338
Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1198342&r1=1198341&r2=1198342&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/Exchange.java Sun Nov 6 11:40:38 2011
@@ -183,7 +183,8 @@ public interface Exchange {
String TRACE_EVENT_EXCHANGE = "CamelTraceEventExchange";
String TRANSFER_ENCODING = "Transfer-Encoding";
- String UNIT_OF_WORK_EXHAUSTED = "CamelUnitOfWorkExhausted";
+ String UNIT_OF_WORK_EXHAUSTED = "CamelUnitOfWorkExhausted";
+ String UNIT_OF_WORK_PROCESS_SYNC = "CamelUnitOfWorkProcessSync";
String XSLT_FILE_NAME = "CamelXsltFileName";
Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java?rev=1198342&r1=1198341&r2=1198342&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java Sun Nov 6 11:40:38 2011
@@ -24,6 +24,7 @@ import org.apache.camel.impl.DefaultUnit
import org.apache.camel.impl.MDCUnitOfWork;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.util.AsyncProcessorHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,34 +98,12 @@ public class UnitOfWorkProcessor extends
return true;
}
- // process the exchange
- try {
- return processor.process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- // Order here matters. We need to complete the callbacks
- // since they will likely update the exchange with some final results.
- try {
- callback.done(doneSync);
- } finally {
- doneUow(uow, exchange);
- }
- }
- });
- } catch (Throwable e) {
- LOG.warn("Caught unhandled exception while processing ExchangeId: " + exchange.getExchangeId(), e);
-
- // fallback and catch any exceptions the process may not have caught
- // we must ensure to done the UoW in all cases and issue done on the callback
- exchange.setException(e);
-
- // Order here matters. We need to complete the callbacks
- // since they will likely update the exchange with some final results.
- try {
- callback.done(true);
- } finally {
- doneUow(uow, exchange);
- }
- return true;
+ Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
+ if (synchronous != null) {
+ // the exchange signalled to process synchronously
+ return processSync(exchange, callback, uow);
+ } else {
+ return processAsync(exchange, callback, uow);
}
} else {
// There was an existing UoW, so we should just pass through..
@@ -133,6 +112,59 @@ public class UnitOfWorkProcessor extends
}
}
+ protected boolean processSync(final Exchange exchange, final AsyncCallback callback, final UnitOfWork uow) {
+ LOG.trace("Exchange marked UnitOfWork to be processed synchronously: {}", exchange);
+
+ // process the exchange synchronously
+ try {
+ AsyncProcessorHelper.process(processor, exchange);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ }
+
+ try {
+ callback.done(true);
+ } finally {
+ doneUow(uow, exchange);
+ }
+
+ return true;
+ }
+
+ protected boolean processAsync(final Exchange exchange, final AsyncCallback callback, final UnitOfWork uow) {
+ LOG.trace("Processing exchange asynchronously: {}", exchange);
+
+ // process the exchange asynchronously
+ try {
+ return processor.process(exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ // Order here matters. We need to complete the callbacks
+ // since they will likely update the exchange with some final results.
+ try {
+ callback.done(doneSync);
+ } finally {
+ doneUow(uow, exchange);
+ }
+ }
+ });
+ } catch (Throwable e) {
+ LOG.warn("Caught unhandled exception while processing ExchangeId: " + exchange.getExchangeId(), e);
+
+ // fallback and catch any exceptions the process may not have caught
+ // we must ensure to done the UoW in all cases and issue done on the callback
+ exchange.setException(e);
+
+ // Order here matters. We need to complete the callbacks
+ // since they will likely update the exchange with some final results.
+ try {
+ callback.done(true);
+ } finally {
+ doneUow(uow, exchange);
+ }
+ return true;
+ }
+ }
+
/**
* Strategy to create the unit of work for the given exchange.
*
Copied: camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java (from r1198338, camel/trunk/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java)
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java?p2=camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java&r1=1198338&r2=1198342&rev=1198342&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java Sun Nov 6 11:40:38 2011
@@ -22,7 +22,7 @@ import java.util.concurrent.Executors;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;
-import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.impl.SynchronizationAdapter;
/**
*