You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/01/21 10:58:53 UTC

svn commit: r1436246 - in /camel/branches/camel-2.10.x: ./ camel-core/src/main/java/org/apache/camel/component/bean/ components/camel-spring/src/test/java/org/apache/camel/component/bean/ components/camel-spring/src/test/java/org/apache/camel/spring/bind/

Author: davsclaus
Date: Mon Jan 21 09:58:53 2013
New Revision: 1436246

URL: http://svn.apache.org/viewvc?rev=1436246&view=rev
Log:
CAMEL-5987: Using annotations DSL in beans may cause async callback to be called twice

Modified:
    camel/branches/camel-2.10.x/   (props changed)
    camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java
    camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
    camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/bean/MethodInvocation.java
    camel/branches/camel-2.10.x/components/camel-spring/src/test/java/org/apache/camel/component/bean/BeanWithRecipientListTest.java
    camel/branches/camel-2.10.x/components/camel-spring/src/test/java/org/apache/camel/spring/bind/BeanInfoTest.java

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1436244

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java?rev=1436246&r1=1436245&r2=1436246&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java Mon Jan 21 09:58:53 2013
@@ -16,10 +16,6 @@
  */
 package org.apache.camel.component.bean;
 
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
@@ -42,7 +38,6 @@ public class BeanProcessor extends Servi
     private static final transient Logger LOG = LoggerFactory.getLogger(BeanProcessor.class);
 
     private boolean multiParameterArray;
-    private Method methodObject;
     private String method;
     private BeanHolder beanHolder;
     private boolean shorthandMethod;
@@ -65,8 +60,7 @@ public class BeanProcessor extends Servi
 
     @Override
     public String toString() {
-        String description = methodObject != null ? " " + methodObject : "";
-        return "BeanProcessor[" + beanHolder + description + "]";
+        return "BeanProcessor[" + beanHolder + "]";
     }
 
     public void process(Exchange exchange) throws Exception {
@@ -156,50 +150,15 @@ public class BeanProcessor extends Servi
             in.removeHeader(Exchange.BEAN_MULTI_PARAMETER_ARRAY);
             in.removeHeader(Exchange.BEAN_METHOD_NAME);
         }
-        if (invocation == null) {
-            throw new IllegalStateException("No method invocation could be created, no matching method could be found on: " + bean);
-        }
 
-        Object value;
-        try {
-            AtomicBoolean sync = new AtomicBoolean(true);
-            value = invocation.proceed(callback, sync);
-            if (!sync.get()) {
-                LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
-                // the remainder of the routing will be completed async
-                // so we break out now, then the callback will be invoked which then continue routing from where we left here
-                return false;
-            }
-
-            LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
-        } catch (InvocationTargetException e) {
-            // let's unwrap the exception when it's an invocation target exception
-            exchange.setException(e.getCause());
-            callback.done(true);
-            return true;
-        } catch (Throwable e) {
-            exchange.setException(e);
+        if (invocation == null) {
+            exchange.setException(new IllegalStateException("No method invocation could be created, no matching method could be found on: " + bean));
             callback.done(true);
             return true;
         }
 
-        // if the method returns something then set the value returned on the Exchange
-        if (!invocation.getMethod().getReturnType().equals(Void.TYPE) && value != Void.TYPE) {
-            if (exchange.getPattern().isOutCapable()) {
-                // force out creating if not already created (as its lazy)
-                LOG.debug("Setting bean invocation result on the OUT message: {}", value);
-                exchange.getOut().setBody(value);
-                // propagate headers
-                exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
-            } else {
-                // if not out then set it on the in
-                LOG.debug("Setting bean invocation result on the IN message: {}", value);
-                exchange.getIn().setBody(value);
-            }
-        }
-
-        callback.done(true);
-        return true;
+        // invoke invocation
+        return invocation.proceed(callback);
     }
 
     protected Processor getProcessor() {

Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java?rev=1436246&r1=1436245&r2=1436246&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java Mon Jan 21 09:58:53 2013
@@ -28,7 +28,6 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
@@ -217,7 +216,21 @@ public class MethodInfo {
                 return arguments;
             }
 
-            public Object proceed(AsyncCallback callback, AtomicBoolean doneSync) throws Exception {
+            public boolean proceed(AsyncCallback callback) {
+                try {
+                    return doProceed(callback);
+                } catch (InvocationTargetException e) {
+                    exchange.setException(e.getTargetException());
+                    callback.done(true);
+                    return true;
+                } catch (Throwable e) {
+                    exchange.setException(e);
+                    callback.done(true);
+                    return true;
+                }
+            }
+
+            private boolean doProceed(AsyncCallback callback) throws Exception {
                 // dynamic router should be invoked beforehand
                 if (dynamicRouter != null) {
                     if (!dynamicRouter.isStarted()) {
@@ -225,10 +238,7 @@ public class MethodInfo {
                     }
                     // use a expression which invokes the method to be used by dynamic router
                     Expression expression = new DynamicRouterExpression(pojo);
-                    boolean sync = dynamicRouter.doRoutingSlip(exchange, expression, callback);
-                    // must remember the done sync returned from the dynamic router
-                    doneSync.set(sync);
-                    return Void.TYPE;
+                    return dynamicRouter.doRoutingSlip(exchange, expression, callback);
                 }
 
                 // invoke pojo
@@ -242,24 +252,34 @@ public class MethodInfo {
                     if (!recipientList.isStarted()) {
                         ServiceHelper.startService(recipientList);
                     }
-                    boolean sync = recipientList.sendToRecipientList(exchange, result, callback);
-                    // must remember the done sync returned from the recipient list
-                    doneSync.set(sync);
-                    // we don't want to return the list of endpoints
-                    // return Void to indicate to BeanProcessor that there is no reply
-                    return Void.TYPE;
+                    return recipientList.sendToRecipientList(exchange, result, callback);
                 }
                 if (routingSlip != null) {
                     if (!routingSlip.isStarted()) {
                         ServiceHelper.startService(routingSlip);
                     }
-                    boolean sync = routingSlip.doRoutingSlip(exchange, result, callback);
-                    // must remember the done sync returned from the routing slip
-                    doneSync.set(sync);
-                    return Void.TYPE;
+                    return routingSlip.doRoutingSlip(exchange, result, callback);
+                }
+
+                // if the method returns something then set the value returned on the Exchange
+                if (!getMethod().getReturnType().equals(Void.TYPE) && result != Void.TYPE) {
+                    if (exchange.getPattern().isOutCapable()) {
+                        // force out creating if not already created (as its lazy)
+                        LOG.debug("Setting bean invocation result on the OUT message: {}", result);
+                        exchange.getOut().setBody(result);
+                        // propagate headers
+                        exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+                    } else {
+                        // if not out then set it on the in
+                        LOG.debug("Setting bean invocation result on the IN message: {}", result);
+                        exchange.getIn().setBody(result);
+                    }
                 }
 
-                return result;
+                // we did not use any of the eips, but just invoked the bean
+                // so notify the callback we are done synchronously
+                callback.done(true);
+                return true;
             }
 
             public Object getThis() {

Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/bean/MethodInvocation.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/bean/MethodInvocation.java?rev=1436246&r1=1436245&r2=1436246&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/bean/MethodInvocation.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/bean/MethodInvocation.java Mon Jan 21 09:58:53 2013
@@ -18,7 +18,6 @@ package org.apache.camel.component.bean;
 
 import java.lang.reflect.AccessibleObject;
 import java.lang.reflect.Method;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.AsyncCallback;
 
@@ -33,7 +32,13 @@ public interface MethodInvocation {
 
     Object[] getArguments();
 
-    Object proceed(AsyncCallback callback, AtomicBoolean doneSync) throws Exception;
+    /**
+     * Proceed and invokes the method.
+     *
+     * @param callback   the callback
+     * @return see {@link org.apache.camel.AsyncProcessor#process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)}
+     */
+    boolean proceed(AsyncCallback callback);
 
     Object getThis();
 

Modified: camel/branches/camel-2.10.x/components/camel-spring/src/test/java/org/apache/camel/component/bean/BeanWithRecipientListTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-spring/src/test/java/org/apache/camel/component/bean/BeanWithRecipientListTest.java?rev=1436246&r1=1436245&r2=1436246&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-spring/src/test/java/org/apache/camel/component/bean/BeanWithRecipientListTest.java (original)
+++ camel/branches/camel-2.10.x/components/camel-spring/src/test/java/org/apache/camel/component/bean/BeanWithRecipientListTest.java Mon Jan 21 09:58:53 2013
@@ -19,15 +19,16 @@ package org.apache.camel.component.bean;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit38.AbstractJUnit38SpringContextTests;
+import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
 
 /**
  * @version 
  */
 @ContextConfiguration
-public class BeanWithRecipientListTest extends AbstractJUnit38SpringContextTests {
+public class BeanWithRecipientListTest extends AbstractJUnit4SpringContextTests {
     @Autowired
     protected ProducerTemplate template;
     @EndpointInject(uri = "mock:a")
@@ -37,6 +38,7 @@ public class BeanWithRecipientListTest e
 
     protected String body = "James";
 
+    @Test
     public void testSendBody() throws Exception {
         a.expectedBodiesReceived(body);
         b.expectedBodiesReceived(body);

Modified: camel/branches/camel-2.10.x/components/camel-spring/src/test/java/org/apache/camel/spring/bind/BeanInfoTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-spring/src/test/java/org/apache/camel/spring/bind/BeanInfoTest.java?rev=1436246&r1=1436245&r2=1436246&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-spring/src/test/java/org/apache/camel/spring/bind/BeanInfoTest.java (original)
+++ camel/branches/camel-2.10.x/components/camel-spring/src/test/java/org/apache/camel/spring/bind/BeanInfoTest.java Mon Jan 21 09:58:53 2013
@@ -43,14 +43,14 @@ public class BeanInfoTest extends TestCa
         assertNotNull("Should have found a method invocation!", invocation);
 
         AtomicBoolean sync = new AtomicBoolean(true);
-        Object value = invocation.proceed(new AsyncCallback() {
+        invocation.proceed(new AsyncCallback() {
             public void done(boolean doneSync) {
                 // nnop
             }
-        }, sync);
+        });
 
         assertEquals(true, sync.get());
-        assertEquals("Hello James!", value);
+        assertEquals("Hello James!", exchange.getIn().getBody());
     }
 
     public void testBeanProcessor() throws Exception {