You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/05/27 15:30:21 UTC

svn commit: r948830 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/CamelPostProcessorHelper.java test/java/org/apache/camel/impl/CamelPostProcessorHelperTest.java

Author: davsclaus
Date: Thu May 27 13:30:20 2010
New Revision: 948830

URL: http://svn.apache.org/viewvc?rev=948830&view=rev
Log:
CAMEL-2760: Fixed @Consume to use UoW so callbacks is invoked when the routing is done.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java?rev=948830&r1=948829&r2=948830&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java Thu May 27 13:30:20 2010
@@ -33,6 +33,7 @@ import org.apache.camel.ProducerTemplate
 import org.apache.camel.Service;
 import org.apache.camel.component.bean.BeanProcessor;
 import org.apache.camel.component.bean.ProxyHelper;
+import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
@@ -126,7 +127,8 @@ public class CamelPostProcessorHelper im
     protected Processor createConsumerProcessor(final Object pojo, final Method method, final Endpoint endpoint) {
         BeanProcessor answer = new BeanProcessor(pojo, getCamelContext());
         answer.setMethodObject(method);
-        return answer;
+        // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked
+        return new UnitOfWorkProcessor(answer);
     }
 
     protected Endpoint getEndpointInjection(String uri, String name, String injectionPointName, boolean mandatory) {

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperTest.java?rev=948830&r1=948829&r2=948830&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperTest.java Thu May 27 13:30:20 2010
@@ -35,6 +35,8 @@ import org.apache.camel.util.ObjectHelpe
  */
 public class CamelPostProcessorHelperTest extends ContextTestSupport {
 
+    private MySynchronization mySynchronization;
+
     public void testConstructor() {
         CamelPostProcessorHelper helper = new CamelPostProcessorHelper();
         assertNull(helper.getCamelContext());
@@ -69,6 +71,24 @@ public class CamelPostProcessorHelperTes
         assertMockEndpointsSatisfied();
     }
 
+    public void testConsumeSynchronization() throws Exception {
+        mySynchronization = new MySynchronization();
+        CamelPostProcessorHelper helper = new CamelPostProcessorHelper(context);
+
+        MyConsumeAndSynchronizationBean my = new MyConsumeAndSynchronizationBean();
+        Method method = my.getClass().getMethod("consumeSomething", String.class, Exchange.class);
+        helper.consumerInjection(method, my, "foo");
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        template.sendBody("seda:foo", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals("Should have invoked onDone", true, mySynchronization.isOnDone());
+    }
+
     public void testEndpointInjectProducerTemplate() throws Exception {
         CamelPostProcessorHelper helper = new CamelPostProcessorHelper(context);
 
@@ -257,6 +277,30 @@ public class CamelPostProcessorHelperTes
         }
     }
 
+    public class MyConsumeAndSynchronizationBean {
+
+        @Consume(uri = "seda:foo")
+        public void consumeSomething(String body, Exchange exchange) {
+            exchange.addOnCompletion(mySynchronization);
+            assertEquals("Hello World", body);
+            template.sendBody("mock:result", body);
+        }
+    }
+
+    private class MySynchronization extends SynchronizationAdapter {
+
+        private boolean onDone;
+
+        @Override
+        public void onDone(Exchange exchange) {
+            onDone = true;
+        }
+
+        public boolean isOnDone() {
+            return onDone;
+        }
+    }
+
     public class MyEndpointInjectBeanProducerTemplate {
 
         private ProducerTemplate producer;