You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/03/31 11:54:38 UTC

svn commit: r1087229 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/bean/ test/java/org/apache/camel/component/bean/

Author: davsclaus
Date: Thu Mar 31 09:54:38 2011
New Revision: 1087229

URL: http://svn.apache.org/viewvc?rev=1087229&view=rev
Log:
CAMEL-3790: Camel proxy now support asynchronous clients using Future handles.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java?rev=1087229&r1=1087228&r2=1087229&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java Thu Mar 31 09:54:38 2011
@@ -18,10 +18,19 @@ package org.apache.camel.component.bean;
 
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.InvalidPayloadException;
 import org.apache.camel.Producer;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.impl.DefaultExchange;
@@ -38,6 +47,9 @@ import org.slf4j.LoggerFactory;
 public class CamelInvocationHandler implements InvocationHandler {
     private static final transient Logger LOG = LoggerFactory.getLogger(CamelInvocationHandler.class);
 
+    // use a static thread pool to not create a new thread pool for each invocation
+    private static ExecutorService executorService;
+
     private final Endpoint endpoint;
     private final Producer producer;
     private final MethodInfoCache methodInfoCache;
@@ -48,26 +60,61 @@ public class CamelInvocationHandler impl
         this.methodInfoCache = methodInfoCache;
     }
 
-    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+    public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
         BeanInvocation invocation = new BeanInvocation(method, args);
-        ExchangePattern pattern = ExchangePattern.InOut;
         MethodInfo methodInfo = methodInfoCache.getMethodInfo(method);
-        if (methodInfo != null) {
-            pattern = methodInfo.getPattern();
-        }
-        Exchange exchange = new DefaultExchange(endpoint, pattern);
+
+        final ExchangePattern pattern = methodInfo != null ? methodInfo.getPattern() : ExchangePattern.InOut;
+        final Exchange exchange = new DefaultExchange(endpoint, pattern);
         exchange.getIn().setBody(invocation);
 
-        // process the exchange
-        LOG.trace("Proxied method call {} invoking producer: {}", method.getName(), producer);
-        producer.process(exchange);
+        // is the return type a future
+        final boolean isFuture = method.getReturnType() == Future.class;
+
+        // create task to execute the proxy and gather the reply
+        FutureTask task = new FutureTask<Object>(new Callable<Object>() {
+            public Object call() throws Exception {
+                // process the exchange
+                LOG.trace("Proxied method call {} invoking producer: {}", method.getName(), producer);
+                producer.process(exchange);
+
+                Object answer = afterInvoke(method, exchange, pattern, isFuture);
+                LOG.trace("Proxied method call {} returning: {}", method.getName(), answer);
+                return answer;
+            }
+        });
+
+        if (isFuture) {
+            // submit task and return future
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Submitting task for exchange id {}", exchange.getExchangeId());
+            }
+            getExecutorService(exchange.getContext()).submit(task);
+            return task;
+        } else {
+            // execute task now
+            try {
+                task.run();
+                return task.get();
+            } catch (ExecutionException e) {
+                // we don't want the wrapped exception from JDK
+                throw e.getCause();
+            }
+        }
+    }
 
+    protected Object afterInvoke(Method method, Exchange exchange, ExchangePattern pattern, boolean isFuture) throws Exception {
         // check if we had an exception
         Throwable cause = exchange.getException();
         if (cause != null) {
             Throwable found = findSuitableException(cause, method);
             if (found != null) {
-                throw found;
+                if (found instanceof Exception) {
+                    throw (Exception) found;
+                } else {
+                    // wrap as exception
+                    throw new CamelExchangeException("Error processing exchange", exchange, cause);
+                }
             }
             // special for runtime camel exceptions as they can be nested
             if (cause instanceof RuntimeCamelException) {
@@ -78,28 +125,73 @@ public class CamelInvocationHandler impl
                 throw (RuntimeCamelException) cause;
             }
             // okay just throw the exception as is
-            throw cause;
+            if (cause instanceof Exception) {
+                throw (Exception) cause;
+            } else {
+                // wrap as exception
+                throw new CamelExchangeException("Error processing exchange", exchange, cause);
+            }
         }
 
-        // do not return a reply if the method is VOID or the MEP is not OUT capable
+        // do not return a reply if the method is VOID
         Class<?> to = method.getReturnType();
-        if (to == Void.TYPE || !pattern.isOutCapable()) {
-            return null;
-        }
-
-        // only convert if there is a body
-        if (!exchange.hasOut() || exchange.getOut().getBody() == null) {
-            // there is no body so return null
+        if (to == Void.TYPE) {
             return null;
         }
 
         // use type converter so we can convert output in the desired type defined by the method
         // and let it be mandatory so we know wont return null if we cant convert it to the defined type
-        Object answer = exchange.getOut().getMandatoryBody(to);
-        LOG.trace("Proxied method call {} returning: {}", method.getName(), answer);
+        Object answer;
+        if (!isFuture) {
+            answer = getBody(exchange, to);
+        } else {
+            // if its a Future then we need to extract the class from the future type so we know
+            // which class to return the result as
+            Class<?> returnTo = getGenericType(exchange.getContext(), method.getGenericReturnType());
+            answer = getBody(exchange, returnTo);
+        }
+
         return answer;
     }
 
+    private static Object getBody(Exchange exchange, Class<?> type) throws InvalidPayloadException {
+        // get the body from the Exchange from either OUT or IN
+        if (exchange.hasOut()) {
+            if (exchange.getOut().getBody() != null) {
+                return exchange.getOut().getMandatoryBody(type);
+            } else {
+                return null;
+            }
+        } else {
+            if (exchange.getIn().getBody() != null) {
+                return exchange.getIn().getMandatoryBody(type);
+            } else {
+                return null;
+            }
+        }
+    }
+
+    protected static Class getGenericType(CamelContext context, Type type) throws ClassNotFoundException {
+        if (type == null) {
+            // fallback and use object
+            return Object.class;
+        }
+
+        // unfortunately java dont provide a nice api for getting the generic type of the return type
+        // due type erasure, so we have to gather it based on a String representation
+        String name = ObjectHelper.between(type.toString(), "<", ">");
+        if (name != null) {
+            if (name.contains("<")) {
+                // we only need the outer type
+                name = ObjectHelper.before(name, "<");
+            }
+            return context.getClassResolver().resolveMandatoryClass(name);
+        } else {
+            // fallback and use object
+            return Object.class;
+        }
+    }
+
     /**
      * Tries to find the best suited exception to throw.
      * <p/>
@@ -126,5 +218,18 @@ public class CamelInvocationHandler impl
         return null;
     }
 
+    protected static synchronized ExecutorService getExecutorService(CamelContext context) {
+        // CamelContext will shutdown thread pool when it shutdown so we can lazy create it on demand
+        // but in case of hot-deploy or the likes we need to be able to re-create it (its a shared static instance)
+        if (executorService == null || executorService.isTerminated() || executorService.isShutdown()) {
+            // try to lookup a pool first based on id/profile
+            executorService = context.getExecutorServiceStrategy().lookup(CamelInvocationHandler.class, "CamelInvocationHandler", "CamelInvocationHandler");
+            if (executorService == null) {
+                executorService = context.getExecutorServiceStrategy().newDefaultThreadPool(CamelInvocationHandler.class, "CamelInvocationHandler");
+            }
+        }
+        return executorService;
+    }
+
 }
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java?rev=1087229&r1=1087228&r2=1087229&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java Thu Mar 31 09:54:38 2011
@@ -83,16 +83,6 @@ public class BeanProxyTest extends Conte
         assertEquals("<order>FAIL</order>", reply);
     }
 
-    // TODO: Does not pass on JDK6
-
-    public void disabledtestBeanProxyFailureNullBody() throws Exception {
-        Endpoint endpoint = context.getEndpoint("direct:start");
-        OrderService service = ProxyHelper.createProxy(endpoint, OrderService.class);
-
-        String reply = service.submitOrderStringReturnString(null);
-        assertEquals("<order>FAIL</order>", reply);
-    }
-
     public void testBeanProxyFailureNotXMLBody() throws Exception {
         Endpoint endpoint = context.getEndpoint("direct:start");
         OrderService service = ProxyHelper.createProxy(endpoint, OrderService.class);

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java?rev=1087229&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java Thu Mar 31 09:54:38 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.bean;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version 
+ */
+public class ProxyReturnFutureExceptionTest extends ContextTestSupport {
+
+    public void testFutureEchoException() throws Exception {
+        Echo service = ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Echo.class);
+
+        Future future = service.asText(4);
+        log.info("Got future");
+        assertFalse("Should not be done", future.isDone());
+        log.info("Waiting for future to be done ...");
+        try {
+            assertEquals("Four", future.get(5, TimeUnit.SECONDS));
+            fail("Should have thrown exception");
+        } catch (ExecutionException e) {
+            IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+            assertEquals("Forced", cause.getMessage());
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:echo")
+                    .delay(2000)
+                    .throwException(new IllegalArgumentException("Forced"));
+            }
+        };
+    }
+
+    public static interface Echo {
+        Future<String> asText(int number);
+    }
+
+}

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java?rev=1087229&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java Thu Mar 31 09:54:38 2011
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.bean;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version 
+ */
+public class ProxyReturnFutureListTest extends ContextTestSupport {
+
+    public void testFutureList() throws Exception {
+        Users service = ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Users.class);
+
+        Future future = service.getUsers(true);
+        log.info("Got future");
+        assertFalse("Should not be done", future.isDone());
+        log.info("Waiting for future to be done ...");
+
+        List<String> users = (List<String>) future.get(5, TimeUnit.SECONDS);
+        assertEquals("Claus", users.get(0));
+        assertEquals("Jonathan", users.get(1));
+    }
+
+    public void testFutureListCallTwoTimes() throws Exception {
+        Users service = ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Users.class);
+
+        Future future = service.getUsers(true);
+        log.info("Got future");
+        assertFalse("Should not be done", future.isDone());
+        log.info("Waiting for future to be done ...");
+
+        List<String> users = (List<String>) future.get(5, TimeUnit.SECONDS);
+        assertEquals("Claus", users.get(0));
+        assertEquals("Jonathan", users.get(1));
+
+        future = service.getUsers(true);
+        log.info("Got future");
+        assertFalse("Should not be done", future.isDone());
+        log.info("Waiting for future to be done ...");
+
+        users = (List<String>) future.get(5, TimeUnit.SECONDS);
+        assertEquals("Claus", users.get(0));
+        assertEquals("Jonathan", users.get(1));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:echo")
+                    .delay(2000)
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            List<String> users = new ArrayList<String>();
+                            users.add("Claus");
+                            users.add("Jonathan");
+                            exchange.getIn().setBody(users);
+                        }
+                    });
+            }
+        };
+    }
+
+    public static interface Users {
+        Future<List<String>> getUsers(boolean gold);
+    }
+
+}

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java?rev=1087229&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java Thu Mar 31 09:54:38 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.bean;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version 
+ */
+public class ProxyReturnFutureTest extends ContextTestSupport {
+
+    // START SNIPPET: e2
+    public void testFutureEcho() throws Exception {
+        Echo service = ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Echo.class);
+
+        Future future = service.asText(4);
+        log.info("Got future");
+
+        assertFalse("Should not be done", future.isDone());
+        log.info("Waiting for future to be done ...");
+
+        String reply = (String) future.get(5, TimeUnit.SECONDS);
+        assertEquals("Four", reply);
+    }
+    // END SNIPPET: e2
+
+    public void testFutureEchoCallTwoTimes() throws Exception {
+        Echo service = ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Echo.class);
+
+        Future future = service.asText(4);
+        log.info("Got future");
+        assertFalse("Should not be done", future.isDone());
+        log.info("Waiting for future to be done ...");
+        assertEquals("Four", future.get(5, TimeUnit.SECONDS));
+
+        future = service.asText(5);
+        log.info("Got future");
+        assertFalse("Should not be done", future.isDone());
+        log.info("Waiting for future to be done ...");
+        assertEquals("Four", future.get(5, TimeUnit.SECONDS));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:echo")
+                    .delay(2000)
+                    .transform().constant("Four");
+            }
+        };
+    }
+
+    // START SNIPPET: e1
+    public static interface Echo {
+
+        // returning a Future indicate asynchronous invocation
+        Future<String> asText(int number);
+
+    }
+    // END SNIPPET: e1
+
+}