You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2009/07/27 09:06:20 UTC
svn commit: r798045 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/ProducerTemplate.java
main/java/org/apache/camel/impl/DefaultProducerTemplate.java
test/java/org/apache/camel/processor/async/AsyncRouteTest.java
Author: ningjiang
Date: Mon Jul 27 07:06:19 2009
New Revision: 798045
URL: http://svn.apache.org/viewvc?rev=798045&view=rev
Log:
Added the async methods which use Endpoint as parameter
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java?rev=798045&r1=798044&r2=798045&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java Mon Jul 27 07:06:19 2009
@@ -780,6 +780,103 @@
<T> Future<T> asyncRequestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type);
/**
+ * Sends an asynchronous exchange to the given endpoint.
+ *
+ * @param endpoint the endpoint to send the exchange to
+ * @param exchange the exchange to send
+ * @return a handle to be used to get the response in the future
+ */
+ Future<Exchange> asyncSend(Endpoint endpoint, Exchange exchange);
+
+ /**
+ * Sends an asynchronous exchange to the given endpoint.
+ *
+ * @param endpoint the endpoint to send the exchange to
+ * @param processor the transformer used to populate the new exchange
+ * @return a handle to be used to get the response in the future
+ */
+ Future<Exchange> asyncSend(Endpoint endpoint, Processor processor);
+
+ /**
+ * Sends an asynchronous body to the given endpoint.
+ * Uses an {@link ExchangePattern#InOnly} message exchange pattern.
+ *
+ * @param endpoint the endpoint to send the exchange to
+ * @param body the body to send
+ * @return a handle to be used to get the response in the future
+ */
+ Future<Object> asyncSendBody(Endpoint endpoint, Object body);
+
+ /**
+ * Sends an asynchronous body to the given endpoint.
+ * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ *
+ * @param endpoint the endpoint to send the exchange to
+ * @param body the body to send
+ * @return a handle to be used to get the response in the future
+ */
+ Future<Object> asyncRequestBody(Endpoint endpoint, Object body);
+
+ /**
+ * Sends an asynchronous body to the given endpoint.
+ * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ *
+ * @param endpoint the endpoint to send the exchange to
+ * @param body the body to send
+ * @param header the header name
+ * @param headerValue the header value
+ * @return a handle to be used to get the response in the future
+ */
+ Future<Object> asyncRequestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue);
+
+ /**
+ * Sends an asynchronous body to the given endpoint.
+ * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ *
+ * @param endpoint the endpoint to send the exchange to
+ * @param body the body to send
+ * @param headers headers
+ * @return a handle to be used to get the response in the future
+ */
+ Future<Object> asyncRequestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers);
+
+ /**
+ * Sends an asynchronous body to the given endpoint.
+ * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ *
+ * @param endpoint the endpoint to send the exchange to
+ * @param body the body to send
+ * @param type the expected response type
+ * @return a handle to be used to get the response in the future
+ */
+ <T> Future<T> asyncRequestBody(Endpoint endpoint, Object body, Class<T> type);
+
+ /**
+ * Sends an asynchronous body to the given endpoint.
+ * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ *
+ * @param endpoint the endpoint to send the exchange to
+ * @param body the body to send
+ * @param header the header name
+ * @param headerValue the header value
+ * @param type the expected response type
+ * @return a handle to be used to get the response in the future
+ */
+ <T> Future<T> asyncRequestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type);
+
+ /**
+ * Sends an asynchronous body to the given endpoint.
+ * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ *
+ * @param endpoint the endpoint to send the exchange to
+ * @param body the body to send
+ * @param headers headers
+ * @param type the expected response type
+ * @return a handle to be used to get the response in the future
+ */
+ <T> Future<T> asyncRequestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type);
+
+ /**
* Gets the response body from the future handle, will wait until the response is ready.
* <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
* it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=798045&r1=798044&r2=798045&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Mon Jul 27 07:06:19 2009
@@ -435,103 +435,145 @@
}
public Future<Exchange> asyncSend(final String uri, final Exchange exchange) {
- Callable<Exchange> task = new Callable<Exchange>() {
- public Exchange call() throws Exception {
- return send(uri, exchange);
- }
- };
-
- return executor.submit(task);
+ return asyncSend(resolveMandatoryEndpoint(uri), exchange);
}
public Future<Exchange> asyncSend(final String uri, final Processor processor) {
- Callable<Exchange> task = new Callable<Exchange>() {
- public Exchange call() throws Exception {
- return send(uri, processor);
+ return asyncSend(resolveMandatoryEndpoint(uri), processor);
+ }
+
+ public Future<Object> asyncSendBody(final String uri, final Object body) {
+ return asyncSendBody(resolveMandatoryEndpoint(uri), body);
+ }
+
+ public Future<Object> asyncRequestBody(final String uri, final Object body) {
+ return asyncRequestBody(resolveMandatoryEndpoint(uri), body);
+ }
+
+ public <T> Future<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) {
+ return asyncRequestBody(resolveMandatoryEndpoint(uri), body, type);
+ }
+
+ public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) {
+ return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue);
+ }
+
+ public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) {
+ return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue, type);
+ }
+
+ public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) {
+ return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
+ }
+
+ public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) {
+ return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers, type);
+ }
+
+ public <T> T extractFutureBody(Future future, Class<T> type) {
+ return ExchangeHelper.extractFutureBody(context, future, type);
+ }
+
+ public <T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
+ return ExchangeHelper.extractFutureBody(context, future, timeout, unit, type);
+ }
+
+ public Future<Object> asyncRequestBody(final Endpoint endpoint, final Object body) {
+ Callable<Object> task = new Callable<Object>() {
+ public Object call() throws Exception {
+ return requestBody(endpoint, body);
}
};
return executor.submit(task);
}
- public Future<Object> asyncSendBody(final String uri, final Object body) {
- Callable<Object> task = new Callable<Object>() {
- public Object call() throws Exception {
- sendBody(uri, body);
- // its InOnly, so no body to return
- return null;
+ public <T> Future<T> asyncRequestBody(final Endpoint endpoint, final Object body, final Class<T> type) {
+ Callable<T> task = new Callable<T>() {
+ public T call() throws Exception {
+ return requestBody(endpoint, body, type);
}
};
return executor.submit(task);
}
- public Future<Object> asyncRequestBody(final String uri, final Object body) {
+ public Future<Object> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header,
+ final Object headerValue) {
Callable<Object> task = new Callable<Object>() {
public Object call() throws Exception {
- return requestBody(uri, body);
+ return requestBodyAndHeader(endpoint, body, header, headerValue);
}
};
return executor.submit(task);
}
- public <T> Future<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) {
+ public <T> Future<T> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header,
+ final Object headerValue, final Class<T> type) {
Callable<T> task = new Callable<T>() {
public T call() throws Exception {
- return requestBody(uri, body, type);
+ return requestBodyAndHeader(endpoint, body, header, headerValue, type);
}
};
return executor.submit(task);
}
- public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) {
+
+ public Future<Object> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body,
+ final Map<String, Object> headers) {
Callable<Object> task = new Callable<Object>() {
public Object call() throws Exception {
- return requestBodyAndHeader(endpointUri, body, header, headerValue);
+ return requestBodyAndHeaders(endpoint, body, headers);
}
};
return executor.submit(task);
}
- public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) {
+ public <T> Future<T> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body,
+ final Map<String, Object> headers, final Class<T> type) {
Callable<T> task = new Callable<T>() {
public T call() throws Exception {
- return requestBodyAndHeader(endpointUri, body, header, headerValue, type);
+ return requestBodyAndHeaders(endpoint, body, headers, type);
}
};
return executor.submit(task);
}
- public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) {
- Callable<Object> task = new Callable<Object>() {
- public Object call() throws Exception {
- return requestBodyAndHeaders(endpointUri, body, headers);
+ public Future<Exchange> asyncSend(final Endpoint endpoint, final Exchange exchange) {
+ Callable<Exchange> task = new Callable<Exchange>() {
+ public Exchange call() throws Exception {
+ return send(endpoint, exchange);
}
};
return executor.submit(task);
}
- public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) {
- Callable<T> task = new Callable<T>() {
- public T call() throws Exception {
- return requestBodyAndHeaders(endpointUri, body, headers, type);
+ public Future<Exchange> asyncSend(final Endpoint endpoint, final Processor processor) {
+ Callable<Exchange> task = new Callable<Exchange>() {
+ public Exchange call() throws Exception {
+ return send(endpoint, processor);
}
};
return executor.submit(task);
}
- public <T> T extractFutureBody(Future future, Class<T> type) {
- return ExchangeHelper.extractFutureBody(context, future, type);
- }
+ public Future<Object> asyncSendBody(final Endpoint endpoint, final Object body) {
+ Callable<Object> task = new Callable<Object>() {
+ public Object call() throws Exception {
+ sendBody(endpoint, body);
+ // its InOnly, so no body to return
+ return null;
+ }
+ };
- public <T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
- return ExchangeHelper.extractFutureBody(context, future, timeout, unit, type);
+ return executor.submit(task);
}
+
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java?rev=798045&r1=798044&r2=798045&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java Mon Jul 27 07:06:19 2009
@@ -129,7 +129,7 @@
.transform(body().append(" World"))
// now turn the route into async from this point forward
// the caller will have a Future<Exchange> returned as response in OUT
- // to be used to grap the async response when he fell like it
+ // to be used to grape the async response when he fell like it
.threads()
// from this point forward this is the async route doing its work
// so we do a bit of delay to simulate heavy work that takes time