You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2009/07/27 09:06:20 UTC

svn commit: r798045 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/ProducerTemplate.java main/java/org/apache/camel/impl/DefaultProducerTemplate.java test/java/org/apache/camel/processor/async/AsyncRouteTest.java

Author: ningjiang
Date: Mon Jul 27 07:06:19 2009
New Revision: 798045

URL: http://svn.apache.org/viewvc?rev=798045&view=rev
Log:
Added the async methods which use Endpoint as parameter

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java?rev=798045&r1=798044&r2=798045&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java Mon Jul 27 07:06:19 2009
@@ -780,6 +780,103 @@
     <T> Future<T> asyncRequestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type);
 
     /**
+     * Sends an asynchronous exchange to the given endpoint.
+     *
+     * @param endpoint    the endpoint to send the exchange to
+     * @param exchange    the exchange to send
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Exchange> asyncSend(Endpoint endpoint, Exchange exchange);
+
+    /**
+     * Sends an asynchronous exchange to the given endpoint.
+     *
+     * @param endpoint    the endpoint to send the exchange to
+     * @param processor   the transformer used to populate the new exchange
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Exchange> asyncSend(Endpoint endpoint, Processor processor);
+
+    /**
+     * Sends an asynchronous body to the given endpoint.
+     * Uses an {@link ExchangePattern#InOnly} message exchange pattern.
+     *
+     * @param endpoint    the endpoint to send the exchange to
+     * @param body        the body to send
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Object> asyncSendBody(Endpoint endpoint, Object body);
+
+    /**
+     * Sends an asynchronous body to the given endpoint.
+     * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     *
+     * @param endpoint    the endpoint to send the exchange to
+     * @param body        the body to send
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Object> asyncRequestBody(Endpoint endpoint, Object body);
+
+    /**
+     * Sends an asynchronous body to the given endpoint.
+     * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     *
+     * @param endpoint the endpoint to send the exchange to
+     * @param body        the body to send
+     * @param header      the header name
+     * @param headerValue the header value
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Object> asyncRequestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue);
+
+    /**
+     * Sends an asynchronous body to the given endpoint.
+     * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     *
+     * @param endpoint    the endpoint to send the exchange to
+     * @param body        the body to send
+     * @param headers     headers
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Object> asyncRequestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers);
+
+    /**
+     * Sends an asynchronous body to the given endpoint.
+     * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     *
+     * @param endpoint    the endpoint to send the exchange to
+     * @param body        the body to send
+     * @param type        the expected response type
+     * @return a handle to be used to get the response in the future
+     */
+    <T> Future<T> asyncRequestBody(Endpoint endpoint, Object body, Class<T> type);
+
+    /**
+     * Sends an asynchronous body to the given endpoint.
+     * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     *
+     * @param endpoint    the endpoint to send the exchange to
+     * @param body        the body to send
+     * @param header      the header name
+     * @param headerValue the header value
+     * @param type        the expected response type
+     * @return a handle to be used to get the response in the future
+     */
+    <T> Future<T> asyncRequestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type);
+
+    /**
+     * Sends an asynchronous body to the given endpoint.
+     * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     *
+     * @param endpoint    the endpoint to send the exchange to
+     * @param body        the body to send
+     * @param headers     headers
+     * @param type        the expected response type
+     * @return a handle to be used to get the response in the future
+     */
+    <T> Future<T> asyncRequestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type);
+
+    /**
      * Gets the response body from the future handle, will wait until the response is ready.
      * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
      * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=798045&r1=798044&r2=798045&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Mon Jul 27 07:06:19 2009
@@ -435,103 +435,145 @@
     }
 
     public Future<Exchange> asyncSend(final String uri, final Exchange exchange) {
-        Callable<Exchange> task = new Callable<Exchange>() {
-            public Exchange call() throws Exception {
-                return send(uri, exchange);
-            }
-        };
-
-        return executor.submit(task);
+        return asyncSend(resolveMandatoryEndpoint(uri), exchange);
     }
 
     public Future<Exchange> asyncSend(final String uri, final Processor processor) {
-        Callable<Exchange> task = new Callable<Exchange>() {
-            public Exchange call() throws Exception {
-                return send(uri, processor);
+        return asyncSend(resolveMandatoryEndpoint(uri), processor);
+    }
+
+    public Future<Object> asyncSendBody(final String uri, final Object body) {
+        return asyncSendBody(resolveMandatoryEndpoint(uri), body);
+    }
+
+    public Future<Object> asyncRequestBody(final String uri, final Object body) {
+        return asyncRequestBody(resolveMandatoryEndpoint(uri), body);
+    }
+
+    public <T> Future<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) {
+        return asyncRequestBody(resolveMandatoryEndpoint(uri), body, type);
+    }
+
+    public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) {
+        return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue);
+    }
+
+    public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) {
+        return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue, type);
+    }
+
+    public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) {
+        return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
+    }
+
+    public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) {
+        return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers, type);
+    }
+
+    public <T> T extractFutureBody(Future future, Class<T> type) {
+        return ExchangeHelper.extractFutureBody(context, future, type);
+    }
+
+    public <T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
+        return ExchangeHelper.extractFutureBody(context, future, timeout, unit, type);
+    }
+
+    public Future<Object> asyncRequestBody(final Endpoint endpoint, final Object body) {
+        Callable<Object> task = new Callable<Object>() {
+            public Object call() throws Exception {
+                return requestBody(endpoint, body);
             }
         };
 
         return executor.submit(task);
     }
 
-    public Future<Object> asyncSendBody(final String uri, final Object body) {
-        Callable<Object> task = new Callable<Object>() {
-            public Object call() throws Exception {
-                sendBody(uri, body);
-                // its InOnly, so no body to return
-                return null;
+    public <T> Future<T> asyncRequestBody(final Endpoint endpoint, final Object body, final Class<T> type) {
+        Callable<T> task = new Callable<T>() {
+            public T call() throws Exception {
+                return requestBody(endpoint, body, type);
             }
         };
 
         return executor.submit(task);
     }
 
-    public Future<Object> asyncRequestBody(final String uri, final Object body) {
+    public Future<Object> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header,
+                                                    final Object headerValue) {
         Callable<Object> task = new Callable<Object>() {
             public Object call() throws Exception {
-                return requestBody(uri, body);
+                return requestBodyAndHeader(endpoint, body, header, headerValue);
             }
         };
 
         return executor.submit(task);
     }
 
-    public <T> Future<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) {
+    public <T> Future<T> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header,
+                                                   final Object headerValue, final Class<T> type) {
         Callable<T> task = new Callable<T>() {
             public T call() throws Exception {
-                return requestBody(uri, body, type);
+                return requestBodyAndHeader(endpoint, body, header, headerValue, type);
             }
         };
 
         return executor.submit(task);
     }
 
-    public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) {
+    
+    public Future<Object> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body,
+                                                     final Map<String, Object> headers) {
         Callable<Object> task = new Callable<Object>() {
             public Object call() throws Exception {
-                return requestBodyAndHeader(endpointUri, body, header, headerValue);
+                return requestBodyAndHeaders(endpoint, body, headers);
             }
         };
 
         return executor.submit(task);
     }
 
-    public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) {
+    public <T> Future<T> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body,
+                                                    final Map<String, Object> headers, final Class<T> type) {
         Callable<T> task = new Callable<T>() {
             public T call() throws Exception {
-                return requestBodyAndHeader(endpointUri, body, header, headerValue, type);
+                return requestBodyAndHeaders(endpoint, body, headers, type);
             }
         };
 
         return executor.submit(task);
     }
 
-    public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) {
-        Callable<Object> task = new Callable<Object>() {
-            public Object call() throws Exception {
-                return requestBodyAndHeaders(endpointUri, body, headers);
+    public Future<Exchange> asyncSend(final Endpoint endpoint, final Exchange exchange) {
+        Callable<Exchange> task = new Callable<Exchange>() {
+            public Exchange call() throws Exception {
+                return send(endpoint, exchange);
             }
         };
 
         return executor.submit(task);
     }
 
-    public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) {
-        Callable<T> task = new Callable<T>() {
-            public T call() throws Exception {
-                return requestBodyAndHeaders(endpointUri, body, headers, type);
+    public Future<Exchange> asyncSend(final Endpoint endpoint, final Processor processor) {
+        Callable<Exchange> task = new Callable<Exchange>() {
+            public Exchange call() throws Exception {
+                return send(endpoint, processor);
             }
         };
 
         return executor.submit(task);
     }
 
-    public <T> T extractFutureBody(Future future, Class<T> type) {
-        return ExchangeHelper.extractFutureBody(context, future, type);
-    }
+    public Future<Object> asyncSendBody(final Endpoint endpoint, final Object body) {
+        Callable<Object> task = new Callable<Object>() {
+            public Object call() throws Exception {
+                sendBody(endpoint, body);
+                // its InOnly, so no body to return
+                return null;
+            }
+        };
 
-    public <T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
-        return ExchangeHelper.extractFutureBody(context, future, timeout, unit, type);
+        return executor.submit(task);
     }
+   
 
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java?rev=798045&r1=798044&r2=798045&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java Mon Jul 27 07:06:19 2009
@@ -129,7 +129,7 @@
                         .transform(body().append(" World"))
                             // now turn the route into async from this point forward
                             // the caller will have a Future<Exchange> returned as response in OUT
-                            // to be used to grap the async response when he fell like it
+                            // to be used to grape the async response when he fell like it
                         .threads()
                             // from this point forward this is the async route doing its work
                             // so we do a bit of delay to simulate heavy work that takes time