You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/06 12:40:38 UTC

svn commit: r1198342 - in /camel/branches/camel-2.8.x: ./ camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/test/java/org/apache/camel/

Author: davsclaus
Date: Sun Nov  6 11:40:38 2011
New Revision: 1198342

URL: http://svn.apache.org/viewvc?rev=1198342&view=rev
Log:
CAMEL-4625: Exchange property can control if UoWPorcessor process sync or async. Needed by some consumers to have fine grained control over this.

Added:
    camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java
      - copied, changed from r1198338, camel/trunk/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java
Modified:
    camel/branches/camel-2.8.x/   (props changed)
    camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/Exchange.java
    camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Nov  6 11:40:38 2011
@@ -1 +1 @@
-/camel/trunk:1186106,1186625,1186772,1187221,1187485,1187882,1187893,1188070-1188085,1188642,1188674,1188879,1188881,1189139,1189600,1189681,1189693,1189737,1190212-1190213,1190246,1190303,1195317,1195616,1196210,1197450,1197933,1197948,1198199
+/camel/trunk:1186106,1186625,1186772,1187221,1187485,1187882,1187893,1188070-1188085,1188642,1188674,1188879,1188881,1189139,1189600,1189681,1189693,1189737,1190212-1190213,1190246,1190303,1195317,1195616,1196210,1197450,1197933,1197948,1198199,1198338

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1198342&r1=1198341&r2=1198342&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/Exchange.java Sun Nov  6 11:40:38 2011
@@ -183,7 +183,8 @@ public interface Exchange {
     String TRACE_EVENT_EXCHANGE  = "CamelTraceEventExchange";
     String TRANSFER_ENCODING     = "Transfer-Encoding";
 
-    String UNIT_OF_WORK_EXHAUSTED = "CamelUnitOfWorkExhausted";
+    String UNIT_OF_WORK_EXHAUSTED    = "CamelUnitOfWorkExhausted";
+    String UNIT_OF_WORK_PROCESS_SYNC = "CamelUnitOfWorkProcessSync";
 
     String XSLT_FILE_NAME = "CamelXsltFileName";
 

Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java?rev=1198342&r1=1198341&r2=1198342&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java Sun Nov  6 11:40:38 2011
@@ -24,6 +24,7 @@ import org.apache.camel.impl.DefaultUnit
 import org.apache.camel.impl.MDCUnitOfWork;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.util.AsyncProcessorHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,34 +98,12 @@ public class UnitOfWorkProcessor extends
                 return true;
             }
 
-            // process the exchange
-            try {
-                return processor.process(exchange, new AsyncCallback() {
-                    public void done(boolean doneSync) {
-                        // Order here matters. We need to complete the callbacks
-                        // since they will likely update the exchange with some final results.
-                        try {
-                            callback.done(doneSync);
-                        } finally {
-                            doneUow(uow, exchange);
-                        }
-                    }
-                });
-            } catch (Throwable e) {
-                LOG.warn("Caught unhandled exception while processing ExchangeId: " + exchange.getExchangeId(), e);
-
-                // fallback and catch any exceptions the process may not have caught
-                // we must ensure to done the UoW in all cases and issue done on the callback
-                exchange.setException(e);
-
-                // Order here matters. We need to complete the callbacks
-                // since they will likely update the exchange with some final results.
-                try {
-                    callback.done(true);
-                } finally {
-                    doneUow(uow, exchange);
-                }
-                return true;
+            Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
+            if (synchronous != null) {
+                // the exchange signalled to process synchronously
+                return processSync(exchange, callback, uow);
+            } else {
+                return processAsync(exchange, callback, uow);
             }
         } else {
             // There was an existing UoW, so we should just pass through..
@@ -133,6 +112,59 @@ public class UnitOfWorkProcessor extends
         }
     }
 
+    protected boolean processSync(final Exchange exchange, final AsyncCallback callback, final UnitOfWork uow) {
+        LOG.trace("Exchange marked UnitOfWork to be processed synchronously: {}", exchange);
+
+        // process the exchange synchronously
+        try {
+            AsyncProcessorHelper.process(processor, exchange);
+        } catch (Throwable e) {
+            exchange.setException(e);
+        }
+
+        try {
+            callback.done(true);
+        } finally {
+            doneUow(uow, exchange);
+        }
+
+        return true;
+    }
+
+    protected boolean processAsync(final Exchange exchange, final AsyncCallback callback, final UnitOfWork uow) {
+        LOG.trace("Processing exchange asynchronously: {}", exchange);
+
+        // process the exchange asynchronously
+        try {
+            return processor.process(exchange, new AsyncCallback() {
+                public void done(boolean doneSync) {
+                    // Order here matters. We need to complete the callbacks
+                    // since they will likely update the exchange with some final results.
+                    try {
+                        callback.done(doneSync);
+                    } finally {
+                        doneUow(uow, exchange);
+                    }
+                }
+            });
+        } catch (Throwable e) {
+            LOG.warn("Caught unhandled exception while processing ExchangeId: " + exchange.getExchangeId(), e);
+
+            // fallback and catch any exceptions the process may not have caught
+            // we must ensure to done the UoW in all cases and issue done on the callback
+            exchange.setException(e);
+
+            // Order here matters. We need to complete the callbacks
+            // since they will likely update the exchange with some final results.
+            try {
+                callback.done(true);
+            } finally {
+                doneUow(uow, exchange);
+            }
+            return true;
+        }
+    }
+
     /**
      * Strategy to create the unit of work for the given exchange.
      *

Copied: camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java (from r1198338, camel/trunk/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java)
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java?p2=camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java&r1=1198338&r2=1198342&rev=1198342&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java Sun Nov  6 11:40:38 2011
@@ -22,7 +22,7 @@ import java.util.concurrent.Executors;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultExchange;
-import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.impl.SynchronizationAdapter;
 
 /**
  *