You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/03/31 11:54:38 UTC
svn commit: r1087229 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/component/bean/
test/java/org/apache/camel/component/bean/
Author: davsclaus
Date: Thu Mar 31 09:54:38 2011
New Revision: 1087229
URL: http://svn.apache.org/viewvc?rev=1087229&view=rev
Log:
CAMEL-3790: Camel proxy now support asynchronous clients using Future handles.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java?rev=1087229&r1=1087228&r2=1087229&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java Thu Mar 31 09:54:38 2011
@@ -18,10 +18,19 @@ package org.apache.camel.component.bean;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExchangeException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.InvalidPayloadException;
import org.apache.camel.Producer;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.DefaultExchange;
@@ -38,6 +47,9 @@ import org.slf4j.LoggerFactory;
public class CamelInvocationHandler implements InvocationHandler {
private static final transient Logger LOG = LoggerFactory.getLogger(CamelInvocationHandler.class);
+ // use a static thread pool to not create a new thread pool for each invocation
+ private static ExecutorService executorService;
+
private final Endpoint endpoint;
private final Producer producer;
private final MethodInfoCache methodInfoCache;
@@ -48,26 +60,61 @@ public class CamelInvocationHandler impl
this.methodInfoCache = methodInfoCache;
}
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
BeanInvocation invocation = new BeanInvocation(method, args);
- ExchangePattern pattern = ExchangePattern.InOut;
MethodInfo methodInfo = methodInfoCache.getMethodInfo(method);
- if (methodInfo != null) {
- pattern = methodInfo.getPattern();
- }
- Exchange exchange = new DefaultExchange(endpoint, pattern);
+
+ final ExchangePattern pattern = methodInfo != null ? methodInfo.getPattern() : ExchangePattern.InOut;
+ final Exchange exchange = new DefaultExchange(endpoint, pattern);
exchange.getIn().setBody(invocation);
- // process the exchange
- LOG.trace("Proxied method call {} invoking producer: {}", method.getName(), producer);
- producer.process(exchange);
+ // is the return type a future
+ final boolean isFuture = method.getReturnType() == Future.class;
+
+ // create task to execute the proxy and gather the reply
+ FutureTask task = new FutureTask<Object>(new Callable<Object>() {
+ public Object call() throws Exception {
+ // process the exchange
+ LOG.trace("Proxied method call {} invoking producer: {}", method.getName(), producer);
+ producer.process(exchange);
+
+ Object answer = afterInvoke(method, exchange, pattern, isFuture);
+ LOG.trace("Proxied method call {} returning: {}", method.getName(), answer);
+ return answer;
+ }
+ });
+
+ if (isFuture) {
+ // submit task and return future
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Submitting task for exchange id {}", exchange.getExchangeId());
+ }
+ getExecutorService(exchange.getContext()).submit(task);
+ return task;
+ } else {
+ // execute task now
+ try {
+ task.run();
+ return task.get();
+ } catch (ExecutionException e) {
+ // we don't want the wrapped exception from JDK
+ throw e.getCause();
+ }
+ }
+ }
+ protected Object afterInvoke(Method method, Exchange exchange, ExchangePattern pattern, boolean isFuture) throws Exception {
// check if we had an exception
Throwable cause = exchange.getException();
if (cause != null) {
Throwable found = findSuitableException(cause, method);
if (found != null) {
- throw found;
+ if (found instanceof Exception) {
+ throw (Exception) found;
+ } else {
+ // wrap as exception
+ throw new CamelExchangeException("Error processing exchange", exchange, cause);
+ }
}
// special for runtime camel exceptions as they can be nested
if (cause instanceof RuntimeCamelException) {
@@ -78,28 +125,73 @@ public class CamelInvocationHandler impl
throw (RuntimeCamelException) cause;
}
// okay just throw the exception as is
- throw cause;
+ if (cause instanceof Exception) {
+ throw (Exception) cause;
+ } else {
+ // wrap as exception
+ throw new CamelExchangeException("Error processing exchange", exchange, cause);
+ }
}
- // do not return a reply if the method is VOID or the MEP is not OUT capable
+ // do not return a reply if the method is VOID
Class<?> to = method.getReturnType();
- if (to == Void.TYPE || !pattern.isOutCapable()) {
- return null;
- }
-
- // only convert if there is a body
- if (!exchange.hasOut() || exchange.getOut().getBody() == null) {
- // there is no body so return null
+ if (to == Void.TYPE) {
return null;
}
// use type converter so we can convert output in the desired type defined by the method
// and let it be mandatory so we know wont return null if we cant convert it to the defined type
- Object answer = exchange.getOut().getMandatoryBody(to);
- LOG.trace("Proxied method call {} returning: {}", method.getName(), answer);
+ Object answer;
+ if (!isFuture) {
+ answer = getBody(exchange, to);
+ } else {
+ // if its a Future then we need to extract the class from the future type so we know
+ // which class to return the result as
+ Class<?> returnTo = getGenericType(exchange.getContext(), method.getGenericReturnType());
+ answer = getBody(exchange, returnTo);
+ }
+
return answer;
}
+ private static Object getBody(Exchange exchange, Class<?> type) throws InvalidPayloadException {
+ // get the body from the Exchange from either OUT or IN
+ if (exchange.hasOut()) {
+ if (exchange.getOut().getBody() != null) {
+ return exchange.getOut().getMandatoryBody(type);
+ } else {
+ return null;
+ }
+ } else {
+ if (exchange.getIn().getBody() != null) {
+ return exchange.getIn().getMandatoryBody(type);
+ } else {
+ return null;
+ }
+ }
+ }
+
+ protected static Class getGenericType(CamelContext context, Type type) throws ClassNotFoundException {
+ if (type == null) {
+ // fallback and use object
+ return Object.class;
+ }
+
+ // unfortunately java dont provide a nice api for getting the generic type of the return type
+ // due type erasure, so we have to gather it based on a String representation
+ String name = ObjectHelper.between(type.toString(), "<", ">");
+ if (name != null) {
+ if (name.contains("<")) {
+ // we only need the outer type
+ name = ObjectHelper.before(name, "<");
+ }
+ return context.getClassResolver().resolveMandatoryClass(name);
+ } else {
+ // fallback and use object
+ return Object.class;
+ }
+ }
+
/**
* Tries to find the best suited exception to throw.
* <p/>
@@ -126,5 +218,18 @@ public class CamelInvocationHandler impl
return null;
}
+ protected static synchronized ExecutorService getExecutorService(CamelContext context) {
+ // CamelContext will shutdown thread pool when it shutdown so we can lazy create it on demand
+ // but in case of hot-deploy or the likes we need to be able to re-create it (its a shared static instance)
+ if (executorService == null || executorService.isTerminated() || executorService.isShutdown()) {
+ // try to lookup a pool first based on id/profile
+ executorService = context.getExecutorServiceStrategy().lookup(CamelInvocationHandler.class, "CamelInvocationHandler", "CamelInvocationHandler");
+ if (executorService == null) {
+ executorService = context.getExecutorServiceStrategy().newDefaultThreadPool(CamelInvocationHandler.class, "CamelInvocationHandler");
+ }
+ }
+ return executorService;
+ }
+
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java?rev=1087229&r1=1087228&r2=1087229&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java Thu Mar 31 09:54:38 2011
@@ -83,16 +83,6 @@ public class BeanProxyTest extends Conte
assertEquals("<order>FAIL</order>", reply);
}
- // TODO: Does not pass on JDK6
-
- public void disabledtestBeanProxyFailureNullBody() throws Exception {
- Endpoint endpoint = context.getEndpoint("direct:start");
- OrderService service = ProxyHelper.createProxy(endpoint, OrderService.class);
-
- String reply = service.submitOrderStringReturnString(null);
- assertEquals("<order>FAIL</order>", reply);
- }
-
public void testBeanProxyFailureNotXMLBody() throws Exception {
Endpoint endpoint = context.getEndpoint("direct:start");
OrderService service = ProxyHelper.createProxy(endpoint, OrderService.class);
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java?rev=1087229&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java Thu Mar 31 09:54:38 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.bean;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version
+ */
+public class ProxyReturnFutureExceptionTest extends ContextTestSupport {
+
+ public void testFutureEchoException() throws Exception {
+ Echo service = ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Echo.class);
+
+ Future future = service.asText(4);
+ log.info("Got future");
+ assertFalse("Should not be done", future.isDone());
+ log.info("Waiting for future to be done ...");
+ try {
+ assertEquals("Four", future.get(5, TimeUnit.SECONDS));
+ fail("Should have thrown exception");
+ } catch (ExecutionException e) {
+ IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+ assertEquals("Forced", cause.getMessage());
+ }
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:echo")
+ .delay(2000)
+ .throwException(new IllegalArgumentException("Forced"));
+ }
+ };
+ }
+
+ public static interface Echo {
+ Future<String> asText(int number);
+ }
+
+}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java?rev=1087229&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java Thu Mar 31 09:54:38 2011
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.bean;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version
+ */
+public class ProxyReturnFutureListTest extends ContextTestSupport {
+
+ public void testFutureList() throws Exception {
+ Users service = ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Users.class);
+
+ Future future = service.getUsers(true);
+ log.info("Got future");
+ assertFalse("Should not be done", future.isDone());
+ log.info("Waiting for future to be done ...");
+
+ List<String> users = (List<String>) future.get(5, TimeUnit.SECONDS);
+ assertEquals("Claus", users.get(0));
+ assertEquals("Jonathan", users.get(1));
+ }
+
+ public void testFutureListCallTwoTimes() throws Exception {
+ Users service = ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Users.class);
+
+ Future future = service.getUsers(true);
+ log.info("Got future");
+ assertFalse("Should not be done", future.isDone());
+ log.info("Waiting for future to be done ...");
+
+ List<String> users = (List<String>) future.get(5, TimeUnit.SECONDS);
+ assertEquals("Claus", users.get(0));
+ assertEquals("Jonathan", users.get(1));
+
+ future = service.getUsers(true);
+ log.info("Got future");
+ assertFalse("Should not be done", future.isDone());
+ log.info("Waiting for future to be done ...");
+
+ users = (List<String>) future.get(5, TimeUnit.SECONDS);
+ assertEquals("Claus", users.get(0));
+ assertEquals("Jonathan", users.get(1));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:echo")
+ .delay(2000)
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ List<String> users = new ArrayList<String>();
+ users.add("Claus");
+ users.add("Jonathan");
+ exchange.getIn().setBody(users);
+ }
+ });
+ }
+ };
+ }
+
+ public static interface Users {
+ Future<List<String>> getUsers(boolean gold);
+ }
+
+}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java?rev=1087229&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java Thu Mar 31 09:54:38 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.bean;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version
+ */
+public class ProxyReturnFutureTest extends ContextTestSupport {
+
+ // START SNIPPET: e2
+ public void testFutureEcho() throws Exception {
+ Echo service = ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Echo.class);
+
+ Future future = service.asText(4);
+ log.info("Got future");
+
+ assertFalse("Should not be done", future.isDone());
+ log.info("Waiting for future to be done ...");
+
+ String reply = (String) future.get(5, TimeUnit.SECONDS);
+ assertEquals("Four", reply);
+ }
+ // END SNIPPET: e2
+
+ public void testFutureEchoCallTwoTimes() throws Exception {
+ Echo service = ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Echo.class);
+
+ Future future = service.asText(4);
+ log.info("Got future");
+ assertFalse("Should not be done", future.isDone());
+ log.info("Waiting for future to be done ...");
+ assertEquals("Four", future.get(5, TimeUnit.SECONDS));
+
+ future = service.asText(5);
+ log.info("Got future");
+ assertFalse("Should not be done", future.isDone());
+ log.info("Waiting for future to be done ...");
+ assertEquals("Four", future.get(5, TimeUnit.SECONDS));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:echo")
+ .delay(2000)
+ .transform().constant("Four");
+ }
+ };
+ }
+
+ // START SNIPPET: e1
+ public static interface Echo {
+
+ // returning a Future indicate asynchronous invocation
+ Future<String> asText(int number);
+
+ }
+ // END SNIPPET: e1
+
+}