You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/05/07 12:42:44 UTC

svn commit: r772598 [1/2] - in /camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/component/seda/ main/java/org/apache/camel/impl/ main/java/org/apache/camel/impl/converter/ main/java/org/apache/camel/model/ main/java/o...

Author: davsclaus
Date: Thu May  7 10:42:40 2009
New Revision: 772598

URL: http://svn.apache.org/viewvc?rev=772598&view=rev
Log:
CAMEL-1572: Added first cut of async DSL. Fixed CS.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformXpathTest.java
      - copied, changed from r772297, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformProcessorTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteWithErrorTest.java   (with props)
    camel/trunk/camel-core/src/test/resources/org/apache/camel/processor/students.xml   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/RollbackExchangeException.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
    camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java?rev=772598&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java Thu May  7 10:42:40 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Exception occured during execution/processing of an {@link Exchange}.
+ * <p/>
+ * Is usually thrown to the caller when using the {@link org.apache.camel.ProducerTemplate}
+ * to send messages to Camel.
+ *
+ * @version $Revision$
+ */
+public class CamelExecutionException extends RuntimeExchangeException {
+
+    public CamelExecutionException(String message, Exchange exchange) {
+        super(message, exchange);
+    }
+
+    public CamelExecutionException(String message, Exchange exchange, Throwable cause) {
+        super(message, exchange, cause);
+    }
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Thu May  7 10:42:40 2009
@@ -30,6 +30,9 @@
  */
 public interface Exchange {
 
+    String ASYNC_WAIT = "CamelAsyncWait";
+    String ASYNC_WAIT_TIMEOUT = "CamelAsyncWaitTimeout";
+
     String BEAN_METHOD_NAME = "CamelBeanMethodName";
     String BEAN_HOLDER = "CamelBeanHolder";
     String BEAN_MULTI_PARAMETER_ARRAY = "CamelBeanMultiParameterArray";

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java Thu May  7 10:42:40 2009
@@ -29,13 +29,13 @@
     }
 
     public InvalidPayloadException(Exchange exchange, Class<?> type, Message message) {
-        super("No body available of type: " + type.getName()
+        super("No body available of type: " + type.getCanonicalName()
               + NoSuchPropertyException.valueDescription(message.getBody()) + " on: " + message, exchange);
         this.type = type;
     }
 
     public InvalidPayloadException(Exchange exchange, Class<?> type, Message message, Throwable cause) {
-        super("No body available of type: " + type.getName()
+        super("No body available of type: " + type.getCanonicalName()
               + NoSuchPropertyException.valueDescription(message.getBody()) + " on: " + message
               + ". Caused by: " + cause.getMessage(), exchange, cause);
         this.type = type;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java Thu May  7 10:42:40 2009
@@ -70,37 +70,53 @@
 
     /**
      * Sends the body to the default endpoint
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param body the payload to send
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     void sendBody(Object body);
 
     /**
      * Sends the body to the default endpoint with a specified header and header
      * value
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param body the payload to send
      * @param header the header name
      * @param headerValue the header value
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     void sendBodyAndHeader(Object body, String header, Object headerValue);
 
     /**
      * Sends the body to the default endpoint with a specified property and property
      * value
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param body          the payload to send
      * @param property      the property name
      * @param propertyValue the property value
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     void sendBodyAndProperty(Object body, String property, Object propertyValue);
     
     /**
      * Sends the body to the default endpoint with the specified headers and
      * header values
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param body the payload to send
      * @param headers      the headers
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     void sendBodyAndHeaders(Object body, Map<String, Object> headers);
 
@@ -171,65 +187,95 @@
 
     /**
      * Send the body to an endpoint
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpoint   the endpoint to send the exchange to
      * @param body       the payload
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     void sendBody(Endpoint endpoint, Object body);
 
     /**
      * Send the body to an endpoint
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpointUri   the endpoint URI to send the exchange to
      * @param body          the payload
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     void sendBody(String endpointUri, Object body);
 
     /**
      * Send the body to an endpoint with the given {@link ExchangePattern}
      * returning any result output body
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpoint      the endpoint to send the exchange to
      * @param body          the payload
      * @param pattern       the message {@link ExchangePattern} such as
      *   {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
      * @return the result if {@link ExchangePattern} is OUT capable, otherwise <tt>null</tt>
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     Object sendBody(Endpoint endpoint, ExchangePattern pattern, Object body);
 
     /**
      * Send the body to an endpoint returning any result output body
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpointUri   the endpoint URI to send the exchange to
      * @param pattern       the message {@link ExchangePattern} such as
      *   {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
      * @param body          the payload
      * @return the result if {@link ExchangePattern} is OUT capable, otherwise <tt>null</tt>
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     Object sendBody(String endpointUri, ExchangePattern pattern, Object body);
 
     /**
      * Sends the body to an endpoint with a specified header and header value
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpointUri the endpoint URI to send to
      * @param body the payload to send
      * @param header the header name
      * @param headerValue the header value
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     void sendBodyAndHeader(String endpointUri, Object body, String header, Object headerValue);
 
     /**
      * Sends the body to an endpoint with a specified header and header value
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpoint the Endpoint to send to
      * @param body the payload to send
      * @param header the header name
      * @param headerValue the header value
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     void sendBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue);
 
     /**
      * Sends the body to an endpoint with a specified header and header value
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpoint the Endpoint to send to
      * @param pattern the message {@link ExchangePattern} such as
@@ -238,12 +284,16 @@
      * @param header the header name
      * @param headerValue the header value
      * @return the result if {@link ExchangePattern} is OUT capable, otherwise <tt>null</tt>
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     Object sendBodyAndHeader(Endpoint endpoint, ExchangePattern pattern, Object body,
                              String header, Object headerValue);
 
     /**
      * Sends the body to an endpoint with a specified header and header value
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpoint the Endpoint URI to send to
      * @param pattern the message {@link ExchangePattern} such as
@@ -252,32 +302,44 @@
      * @param header the header name
      * @param headerValue the header value
      * @return the result if {@link ExchangePattern} is OUT capable, otherwise <tt>null</tt>
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     Object sendBodyAndHeader(String endpoint, ExchangePattern pattern, Object body,
                              String header, Object headerValue);
 
     /**
      * Sends the body to an endpoint with a specified property and property value
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpointUri the endpoint URI to send to
      * @param body the payload to send
      * @param property the property name
      * @param propertyValue the property value
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     void sendBodyAndProperty(String endpointUri, Object body, String property, Object propertyValue);
 
     /**
      * Sends the body to an endpoint with a specified property and property value
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpoint the Endpoint to send to
      * @param body the payload to send
      * @param property the property name
      * @param propertyValue the property value
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     void sendBodyAndProperty(Endpoint endpoint, Object body, String property, Object propertyValue);
 
     /**
      * Sends the body to an endpoint with a specified property and property value
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpoint the Endpoint to send to
      * @param pattern the message {@link ExchangePattern} such as
@@ -286,12 +348,16 @@
      * @param property the property name
      * @param propertyValue the property value
      * @return the result if {@link ExchangePattern} is OUT capable, otherwise <tt>null</tt>
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     Object sendBodyAndProperty(Endpoint endpoint, ExchangePattern pattern, Object body,
                                String property, Object propertyValue);
 
     /**
      * Sends the body to an endpoint with a specified property and property value
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpoint the Endpoint URI to send to
      * @param pattern the message {@link ExchangePattern} such as
@@ -300,33 +366,42 @@
      * @param property the property name
      * @param propertyValue the property value
      * @return the result if {@link ExchangePattern} is OUT capable, otherwise <tt>null</tt>
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     Object sendBodyAndProperty(String endpoint, ExchangePattern pattern, Object body,
                                String property, Object propertyValue);
 
     /**
-     * Sends the body to an endpoint with the specified headers and header
-     * values
+     * Sends the body to an endpoint with the specified headers and header values
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpointUri the endpoint URI to send to
      * @param body the payload to send
      * @param headers headers
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     void sendBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers);
 
     /**
-     * Sends the body to an endpoint with the specified headers and header
-     * values
+     * Sends the body to an endpoint with the specified headers and header values
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpoint the endpoint URI to send to
      * @param body the payload to send
      * @param headers headers
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     void sendBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers);
 
     /**
-     * Sends the body to an endpoint with the specified headers and header
-     * values
+     * Sends the body to an endpoint with the specified headers and header values
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpointUri the endpoint URI to send to
      * @param pattern the message {@link ExchangePattern} such as
@@ -334,13 +409,16 @@
      * @param body the payload to send
      * @param headers headers
      * @return the result if {@link ExchangePattern} is OUT capable, otherwise <tt>null</tt>
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     Object sendBodyAndHeaders(String endpointUri, ExchangePattern pattern, Object body,
                               Map<String, Object> headers);
 
     /**
-     * Sends the body to an endpoint with the specified headers and header
-     * values
+     * Sends the body to an endpoint with the specified headers and header values
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpoint the endpoint URI to send to
      * @param pattern the message {@link ExchangePattern} such as
@@ -348,6 +426,7 @@
      * @param body the payload to send
      * @param headers headers
      * @return the result if {@link ExchangePattern} is OUT capable, otherwise <tt>null</tt>
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     Object sendBodyAndHeaders(Endpoint endpoint, ExchangePattern pattern, Object body,
                               Map<String, Object> headers);
@@ -379,79 +458,110 @@
     /**
      * Sends the body to the default endpoint and returns the result content
      * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param body the payload to send
      * @return the result (see class javadoc)
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     Object requestBody(Object body);
 
     /**
      * Sends the body to the default endpoint and returns the result content
      * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param body the payload to send
      * @param type the expected response type
      * @return the result (see class javadoc)
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     <T> T requestBody(Object body, Class<T> type);
 
     /**
      * Send the body to an endpoint returning any result output body.
      * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpoint the Endpoint to send to
      * @param body     the payload
      * @return the result (see class javadoc)
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     Object requestBody(Endpoint endpoint, Object body);
 
     /**
      * Send the body to an endpoint returning any result output body.
      * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpoint the Endpoint to send to
      * @param body     the payload
      * @param type     the expected response type
      * @return the result (see class javadoc)
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     <T> T requestBody(Endpoint endpoint, Object body, Class<T> type);
 
     /**
      * Send the body to an endpoint returning any result output body.
      * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpointUri the endpoint URI to send to
      * @param body        the payload
      * @return the result (see class javadoc)
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     Object requestBody(String endpointUri, Object body);
 
     /**
      * Send the body to an endpoint returning any result output body.
      * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpointUri the endpoint URI to send to
      * @param body        the payload
      * @param type        the expected response type
      * @return the result (see class javadoc)
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     <T> T requestBody(String endpointUri, Object body, Class<T> type);
 
     /**
      * Send the body to an endpoint returning any result output body.
      * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpoint    the Endpoint to send to
      * @param body        the payload
      * @param header      the header name
      * @param headerValue the header value
      * @return the result (see class javadoc)
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     Object requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue);
 
     /**
      * Send the body to an endpoint returning any result output body.
      * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpoint    the Endpoint to send to
      * @param body        the payload
@@ -459,24 +569,32 @@
      * @param headerValue the header value
      * @param type        the expected response type
      * @return the result (see class javadoc)
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     <T> T requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type);
 
     /**
      * Send the body to an endpoint returning any result output body.
      * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpointUri the endpoint URI to send to
      * @param body        the payload
      * @param header      the header name
      * @param headerValue the header value
      * @return the result (see class javadoc)
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     Object requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue);
 
     /**
      * Send the body to an endpoint returning any result output body.
      * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpointUri the endpoint URI to send to
      * @param body        the payload
@@ -484,56 +602,69 @@
      * @param headerValue the header value
      * @param type        the expected response type
      * @return the result (see class javadoc)
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     <T> T requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type);
 
     /**
-     * Sends the body to an endpoint with the specified headers and header
-     * values.
+     * Sends the body to an endpoint with the specified headers and header values.
      * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpointUri the endpoint URI to send to
      * @param body the payload to send
      * @param headers headers
      * @return the result (see class javadoc)
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     Object requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers);
 
     /**
-     * Sends the body to an endpoint with the specified headers and header
-     * values.
+     * Sends the body to an endpoint with the specified headers and header values.
      * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpointUri the endpoint URI to send to
      * @param body the payload to send
      * @param headers headers
      * @param type the expected response type
      * @return the result (see class javadoc)
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     <T> T requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type);
 
     /**
-     * Sends the body to an endpoint with the specified headers and header
-     * values.
+     * Sends the body to an endpoint with the specified headers and header values.
      * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpoint the endpoint URI to send to
      * @param body the payload to send
      * @param headers headers
      * @return the result (see class javadoc)
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     Object requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers);
 
     /**
-     * Sends the body to an endpoint with the specified headers and header
-     * values.
+     * Sends the body to an endpoint with the specified headers and header values.
      * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param endpoint the endpoint URI to send to
      * @param body the payload to send
      * @param headers headers
      * @param type the expected response type
      * @return the result (see class javadoc)
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     <T> T requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type);
 
@@ -650,15 +781,22 @@
 
     /**
      * Gets the response body from the future handle, will wait until the response is ready.
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param future      the handle to get the response
      * @param type        the expected response type
      * @return the result (see class javadoc)
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     <T> T asyncExtractBody(Future future, Class<T> type);
 
     /**
      * Gets the response body from the future handle, will wait at most the given time for the response to be ready.
+     * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+     * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+     * the caused exception wrapped.
      *
      * @param future      the handle to get the response
      * @param timeout     the maximum time to wait
@@ -666,6 +804,7 @@
      * @param type        the expected response type
      * @return the result (see class javadoc)
      * @throws java.util.concurrent.TimeoutException if the wait timed out
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     <T> T asyncExtractBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException;
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/RollbackExchangeException.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/RollbackExchangeException.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/RollbackExchangeException.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/RollbackExchangeException.java Thu May  7 10:42:40 2009
@@ -21,22 +21,22 @@
  *
  * @version $Revision$
  */
-public class RollbackExchangeException extends Exception {
+public class RollbackExchangeException extends CamelExchangeException {
 
     public RollbackExchangeException(Exchange exchange) {
-        this("Intended rollback on exchange", exchange);
+        this("Intended rollback", exchange);
     }
 
     public RollbackExchangeException(Exchange exchange, Throwable cause) {
-        this("Intended rollback on exchange", exchange, cause);
+        this("Intended rollback", exchange, cause);
     }
 
     public RollbackExchangeException(String message, Exchange exchange) {
-        super(message + ": " + exchange);
+        super(message, exchange);
     }
 
     public RollbackExchangeException(String message, Exchange exchange, Throwable cause) {
-        super(message + ": " + exchange, cause);
+        super(message, exchange, cause);
     }
 
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Thu May  7 10:42:40 2009
@@ -23,8 +23,8 @@
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java Thu May  7 10:42:40 2009
@@ -19,8 +19,8 @@
 import java.net.URI;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Component;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Thu May  7 10:42:40 2009
@@ -16,10 +16,10 @@
  */
 package org.apache.camel.impl;
 
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java Thu May  7 10:42:40 2009
@@ -25,6 +25,7 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.camel.CamelExecutionException;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoFactoryAvailableException;
 import org.apache.camel.NoTypeConversionAvailableException;
@@ -68,6 +69,7 @@
         addFallbackTypeConverter(new EnumTypeConverter());
         addFallbackTypeConverter(new ArrayTypeConverter());
         addFallbackTypeConverter(new PropertyEditorTypeConverter());
+        addFallbackTypeConverter(new FutureTypeConverter(this));
     }
 
     public List<TypeConverterLoader> getTypeConverterLoaders() {
@@ -82,6 +84,9 @@
         Object answer;
         try {
             answer = doConvertTo(type, exchange, value);
+        } catch (CamelExecutionException e) {
+            // rethrow exception exception as its not due to failed convertion
+            throw e;
         } catch (Exception e) {
             // we cannot convert so return null
             if (LOG.isDebugEnabled()) {

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java?rev=772598&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java Thu May  7 10:42:40 2009
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.converter;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.StreamCache;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Future type converter.
+ *
+ * @version $Revision$
+ */
+@Converter
+public final class FutureTypeConverter implements TypeConverter {
+
+    private static final Log LOG = LogFactory.getLog(FutureTypeConverter.class);
+
+    private final TypeConverter converter;
+
+    public FutureTypeConverter(TypeConverter converter) {
+        this.converter = converter;
+    }
+
+    private <T> T doConvertTo(Class<T> type, Exchange exchange, Object value) throws Exception {
+        // do not convert to stream cache
+        if (StreamCache.class.isAssignableFrom(value.getClass())) {
+            return null;
+        }
+
+        if (Future.class.isAssignableFrom(value.getClass())) {
+
+            Future future = (Future) value;
+
+            if (future.isCancelled()) {
+                return null;
+            }
+
+            // do some trace logging as the get is blocking until the response is ready
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Getting future response");
+            }
+
+            Object body;
+            try {
+                body = future.get();
+            } catch (ExecutionException e) {
+                exchange.setException(e);
+                throw e;
+            }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Got future response");
+            }
+
+            if (body == null) {
+                return null;
+            }
+
+            Class from = body.getClass();
+
+            // maybe from is already the type we want
+            if (from.isAssignableFrom(type)) {
+                return type.cast(from);
+            } else if (body instanceof Exchange) {
+                Exchange result = (Exchange) body;
+                body = ExchangeHelper.extractResultBody(result, result.getPattern());
+            }
+
+            // no then try to lookup a type converter
+            return converter.convertTo(type, exchange, body);
+        }
+
+        return null;
+    }
+
+    public <T> T convertTo(Class<T> type, Object value) {
+        return convertTo(type, null, value);
+    }
+
+    public <T> T convertTo(Class<T> type, Exchange exchange, Object value) {
+        try {
+            return doConvertTo(type, exchange, value);
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    public <T> T mandatoryConvertTo(Class<T> type, Object value) throws NoTypeConversionAvailableException {
+        return mandatoryConvertTo(type, null, value);
+    }
+
+    public <T> T mandatoryConvertTo(Class<T> type, Exchange exchange, Object value) throws NoTypeConversionAvailableException {
+        T answer;
+        try {
+            answer = doConvertTo(type, exchange, value);
+        } catch (Exception e) {
+            throw new NoTypeConversionAvailableException(value, type, e);
+        }
+
+        if (answer == null) {
+            throw new NoTypeConversionAvailableException(value, type);
+        }
+
+        return answer;
+    }
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java?rev=772598&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java Thu May  7 10:42:40 2009
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import java.util.concurrent.ExecutorService;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.Processor;
+import org.apache.camel.processor.AsyncProcessor;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+
+/**
+ * Represents an XML &lt;async/&gt; element
+ *
+ * @version $Revision$
+ */
+@XmlRootElement(name = "async")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class AsyncDefinition extends OutputDefinition<ProcessorDefinition> {
+
+    @XmlTransient
+    private ExecutorService executorService;
+    @XmlAttribute(required = false)
+    private String executorServiceRef;
+    @XmlAttribute(required = false)
+    private Integer poolSize;
+    @XmlAttribute(required = false)
+    private Boolean waitForTaskToComplete = Boolean.TRUE;
+
+    @Override
+    public Processor createProcessor(RouteContext routeContext) throws Exception {
+        if (executorServiceRef != null) {
+            executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
+        }
+        if (executorService == null && poolSize != null) {
+            executorService = ExecutorServiceHelper.newScheduledThreadPool(poolSize, "AsyncProcessor", true);
+        }
+        Processor childProcessor = routeContext.createProcessor(this);
+        return new AsyncProcessor(childProcessor, executorService, waitForTaskToComplete);
+    }
+
+    @Override
+    public String getLabel() {
+        return "async";
+    }
+
+    @Override
+    public String getShortName() {
+        return "async";
+    }
+
+    @Override
+    public String toString() {
+        return "Async[" + getOutputs() + "]";
+    }
+
+    /**
+     * Setting the executor service for executing the multicasting action.
+     *
+     * @return the builder
+     */
+    public AsyncDefinition executorService(ExecutorService executorService) {
+        setExecutorService(executorService);
+        return this;
+    }
+
+    /**
+     * Setting the core pool size for the underlying {@link java.util.concurrent.ExecutorService}.
+     *
+     * @return the builder
+     */
+    public AsyncDefinition poolSize(int poolSize) {
+        setPoolSize(poolSize);
+        return this;
+    }
+
+    /**
+     * Setting to whether to wait for async tasks to be complete before continuing original route.
+     * <p/>
+     * Is default <tt>true</tt>
+     *
+     * @param complete whether to wait or not
+     * @return the builder
+     */
+    public AsyncDefinition waitForTaskToComplete(boolean complete) {
+        setWaitForTaskToComplete(complete);
+        return this;
+    }
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
+    public Integer getPoolSize() {
+        return poolSize;
+    }
+
+    public void setPoolSize(Integer poolSize) {
+        this.poolSize = poolSize;
+    }
+
+    public Boolean getWaitForTaskToComplete() {
+        return waitForTaskToComplete;
+    }
+
+    public void setWaitForTaskToComplete(Boolean waitForTaskToComplete) {
+        this.waitForTaskToComplete = waitForTaskToComplete;
+    }
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Thu May  7 10:42:40 2009
@@ -670,6 +670,33 @@
     }
 
     /**
+     * Breaks the route into asynchronous. The caller thread will end and the OUT message will
+     * contain a {@link java.util.concurrent.Future} handle so you can get the real response
+     * later using this handle.
+     *
+     * @return the builder
+     */
+    public AsyncDefinition async() {
+        AsyncDefinition answer = new AsyncDefinition();
+        addOutput(answer);
+        return answer;
+    }
+
+    /**
+     * Breaks the route into asynchronous. The caller thread will end and the OUT message will
+     * contain a {@link java.util.concurrent.Future} handle so you can get the real response
+     * later using this handle.
+     *
+     * @param poolSize the core pool size
+     * @return the builder
+     */
+    public AsyncDefinition async(int poolSize) {
+        AsyncDefinition answer = async();
+        answer.setPoolSize(poolSize);
+        return answer;
+    }
+
+    /**
      * Ends the current block
      *
      * @return the builder

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java?rev=772598&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java Thu May  7 10:42:40 2009
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+
+/**
+ * Async processor that turns the processing going forward into async mode.
+ * <p/>
+ * The original caller thread will receive a <tt>Future&lt;Exchange&gt;</tt> in the OUT message body.
+ * It can then later use this handle to obtain the async response.
+ * <p/>
+ * Camel also provides type converters so you can just ask to get the desired object type and Camel
+ * will automatic wait for the async task to complete to return the response.
+ *
+ * @version $Revision$
+ */
+public class AsyncProcessor extends DelegateProcessor implements Processor {
+
+    private static final int DEFAULT_THREADPOOL_SIZE = 5;
+    private ExecutorService executorService;
+    private boolean waitTaskComplete;
+
+    public AsyncProcessor(Processor output, ExecutorService executorService, boolean waitTaskComplete) {
+        super(output);
+        this.executorService = executorService;
+        this.waitTaskComplete = waitTaskComplete;
+    }
+
+    public void process(final Exchange exchange) throws Exception {
+        final Processor output = getProcessor();
+        if (output == null) {
+            // no output then return
+            return;
+        }
+
+        // use a new copy of the exchange to route async
+        final Exchange copy = exchange.newCopy();
+
+        // let it execute async and return the Future
+        Callable<Exchange> task = new Callable<Exchange>() {
+            public Exchange call() throws Exception {
+                // must use a copy of the original exchange for processing async
+                output.process(copy);
+                return copy;
+            }
+        };
+
+        // sumbit the task
+        Future<Exchange> future = getExecutorService().submit(task);
+
+        // TODO: Support exchange headers for wait and timeout values, see Exchange constants
+
+        if (waitTaskComplete) {
+            // wait for task to complete
+            Exchange response = future.get();
+            // if we are out capable then set the response on the original exchange
+            if (ExchangeHelper.isOutCapable(exchange)) {
+                ExchangeHelper.copyResults(exchange, response);
+            }
+        } else {
+            // no we do not expect a reply so lets continue, set a handle to the future task
+            // in case end user need it later
+            exchange.getOut().setBody(future);
+        }
+    }
+
+    public ExecutorService getExecutorService() {
+        if (executorService == null) {
+            executorService = createExecutorService();
+        }
+        return executorService;
+    }
+
+    private ExecutorService createExecutorService() {
+        return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "AsyncProcessor", true);
+    }
+
+    protected void doStop() throws Exception {
+        super.doStop();
+        if (executorService != null) {
+            executorService.shutdown();
+        }
+    }
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java Thu May  7 10:42:40 2009
@@ -20,6 +20,7 @@
 import java.util.Map;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExecutionException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -31,7 +32,6 @@
 import org.apache.camel.NoSuchPropertyException;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.TypeConverter;
-import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
 
 /**
  * Some helper methods for working with {@link Exchange} objects
@@ -397,21 +397,22 @@
     }
 
     /**
-     * Extracts the body from the given result.
+     * Extracts the body from the given exchange.
      * <p/>
      * If the exchange pattern is provided it will try to honor it and retrive the body
      * from either IN or OUT according to the pattern.
      *
-     * @param exchange   the result
-     * @param pattern  exchange pattern if given, can be <tt>null</tt>
-     * @return  the result, can be <tt>null</tt>.
+     * @param exchange   the exchange
+     * @param pattern    exchange pattern if given, can be <tt>null</tt>
+     * @return the result body, can be <tt>null</tt>.
+     * @throws CamelExecutionException if the processing of the exchange failed
      */
     public static Object extractResultBody(Exchange exchange, ExchangePattern pattern) {
         Object answer = null;
         if (exchange != null) {
             // rethrow if there was an exception
             if (exchange.getException() != null) {
-                throw wrapRuntimeCamelException(exchange.getException());
+                throw new CamelExecutionException("Exception occured during execution ", exchange, exchange.getException());
             }
 
             // result could have a fault message

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Thu May  7 10:42:40 2009
@@ -18,8 +18,8 @@
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 
 /**
  * Helper for {@link java.util.concurrent.ExecutorService} to construct executors using a thread factory that
@@ -38,7 +38,7 @@
      * Creates a new thread name with the given prefix
      */
     protected static String getThreadName(String name) {
-        return "Camel " + name + " thread:" + nextThreadCounter();
+        return "Camel thread " + nextThreadCounter() + ": " + name;
     }
 
     protected static synchronized int nextThreadCounter() {

Modified: camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index (original)
+++ camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index Thu May  7 10:42:40 2009
@@ -15,6 +15,7 @@
 ## limitations under the License.
 ## ------------------------------------------------------------------------
 AggregateDefinition
+AsyncDefinition
 BeanDefinition
 CatchDefinition
 ChoiceDefinition

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformXpathTest.java (from r772297, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformProcessorTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformXpathTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformXpathTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformProcessorTest.java&r1=772297&r2=772598&rev=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformXpathTest.java Thu May  7 10:42:40 2009
@@ -16,17 +16,44 @@
  */
 package org.apache.camel.processor;
 
+import java.io.File;
+
+import org.w3c.dom.NodeList;
+
+import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Based on user forum trouble
+ */
+public class TransformXpathTest extends ContextTestSupport {
+
+    public void testTransformWithXpath() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.message(0).body().isInstanceOf(NodeList.class);
 
-public class TransformProcessorTest extends TransformViaDSLTest {
+        String xml = context.getTypeConverter().convertTo(String.class, new File("src/test/resources/org/apache/camel/processor/students.xml"));
+
+        template.sendBody("direct:start", xml);
+
+        assertMockEndpointsSatisfied();
+
+        NodeList list = mock.getReceivedExchanges().get(0).getIn().getBody(NodeList.class);
+        assertEquals(2, list.getLength());
+
+        assertEquals("Claus", context.getTypeConverter().convertTo(String.class, list.item(0).getTextContent().trim()));
+        assertEquals("Hadrian", context.getTypeConverter().convertTo(String.class, list.item(1).getTextContent().trim()));
+    }
 
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                // START SNIPPET: example
-                from("direct:start").transform(body().append(" World!")).to("mock:result");
-                // END SNIPPET: example
+                from("direct:start")
+                    .transform().xpath("//students/student")
+                    .to("mock:result");
             }
         };
     }
-}
+}
\ No newline at end of file

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java?rev=772598&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java Thu May  7 10:42:40 2009
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test to verify that error handling using async() also works as expected.
+ *
+ * @version $Revision$
+ */
+public class AsyncErrorHandlerTest extends ContextTestSupport {
+
+    public void testAsyncErrorHandler() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+
+        MockEndpoint mock = getMockEndpoint("mock:dead");
+        mock.expectedMessageCount(1);
+        mock.message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        mock.message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+
+        template.sendBody("direct:in", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(2).delay(0).logStackTrace(false));
+
+                from("direct:in")
+                    .async(2)
+                    .to("mock:foo")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            throw new Exception("Forced exception by unit test");
+                        }
+                    });
+            }
+        };
+    }
+
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java?rev=772598&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java Thu May  7 10:42:40 2009
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import java.util.concurrent.Future;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncNoWaitRouteTest extends ContextTestSupport {
+
+    private String route = "";
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        route = "";
+    }
+
+    public void testAsyncNoWaitRouteExchange() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        // send an in out to the direct endpoint using the classic API
+        Exchange exchange = template.send("direct:start", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.setPattern(ExchangePattern.InOut);
+                exchange.getIn().setBody("Hello");
+            }
+        });
+
+        // we should run before the async processor that sets B
+        route += "A";
+
+        // as it turns into a async route later we get a Future in the IN body
+        Object out = exchange.getOut().getBody();
+        assertIsInstanceOf(Future.class, out);
+
+        // cast to future
+        Future future = (Future) out;
+
+        assertFalse("Should not be done", future.isDone());
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals("AB", route);
+
+        // get the response from the future
+        String response = context.getTypeConverter().convertTo(String.class, future);
+        assertEquals("Bye World", response);
+    }
+
+    public void testAsyncNoWaitRoute() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        // send a request reply to the direct start endpoint
+        Object out = template.requestBody("direct:start", "Hello");
+
+        // we should run before the async processor that sets B
+        route += "A";
+
+        // as it turns into a async route later we get a Future as response
+        assertIsInstanceOf(Future.class, out);
+
+        // cast to future
+        Future future = (Future) out;
+
+        assertFalse("Should not be done", future.isDone());
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals("AB", route);
+
+        // get the response from the future
+        String response = context.getTypeConverter().convertTo(String.class, future);
+        assertEquals("Bye World", response);
+    }
+
+    public void testAsyncRouteNoWaitWithTypeConverted() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        // send a request reply to the direct start endpoint, but will use
+        // future type converter that will wait for the response, even though the async
+        // is set to not wait. As the type converter will wait for us
+        String response = template.requestBody("direct:start", "Hello", String.class);
+
+        // we should wait for the async response as we ask for the result as a String body
+        route += "A";
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals("Bye World", response);
+        assertEquals("BA", route);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // we start this route async
+                from("direct:start")
+                        // we play a bit with the message
+                        .transform(body().append(" World"))
+                                // now turn the route into async from this point forward
+                                // the caller will have a Future<Exchange> returned as response in OUT
+                                // to be used to grap the async response when he fell like it
+                                // we do not want to wait for tasks to be complete so we instruct Camel
+                                // to not wait, and therefore Camel returns the Future<Exchange> handle we
+                                // can use to get the result when we want
+                        .async().waitForTaskToComplete(false)
+                                // from this point forward this is the async route doing its work
+                                // so we do a bit of delay to simulate heavy work that takes time
+                        .to("mock:foo")
+                        .delay(100)
+                                // and we also work with the message so we can prepare a response
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                route += "B";
+                                assertEquals("Hello World", exchange.getIn().getBody());
+                                exchange.getOut().setBody("Bye World");
+                            }
+                            // and we use mocks for unit testing
+                        }).to("mock:result");
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java?rev=772598&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java Thu May  7 10:42:40 2009
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import java.util.concurrent.Future;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncRouteNoWaitWithErrorTest extends ContextTestSupport {
+
+    private String route = "";
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        route = "";
+    }
+
+    public void testAsyncRouteWithError() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        // send a request reply to the direct start endpoint
+        Object out = template.requestBody("direct:start", "Hello");
+
+        // we should run before the async processor that sets B
+        route += "A";
+
+        // as it turns into a async route later we get a Future as response
+        assertIsInstanceOf(Future.class, out);
+
+        // cast to future
+        Future future = (Future) out;
+
+        assertFalse("Should not be done", future.isDone());
+
+        assertMockEndpointsSatisfied();
+
+        // get the response from the future
+        try {
+            String response = context.getTypeConverter().convertTo(String.class, future);
+            fail("Should have thrown an exception");
+        } catch (CamelExecutionException e) {
+            // expected an execution exception
+            assertEquals("Damn forced by unit test", e.getCause().getMessage());
+        }
+
+        assertEquals("AB", route);
+    }
+
+    public void testAsyncRouteWithTypeConverted() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        // send a request reply to the direct start endpoint, but will use
+        // future type converter that will wait for the response
+        try {
+            String response = template.requestBody("direct:start", "Hello", String.class);
+            fail("Should have thrown an exception");
+        } catch (CamelExecutionException e) {
+            // expected an execution exception
+            assertEquals("Damn forced by unit test", e.getCause().getMessage());
+        }
+
+        // we should wait for the async response as we ask for the result as a String body
+        route += "A";
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals("BA", route);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // we start this route async
+                from("direct:start")
+                        // we play a bit with the message
+                        .transform(body().append(" World"))
+                                // now turn the route into async from this point forward
+                                // the caller will have a Future<Exchange> returned as response in OUT
+                                // to be used to grap the async response when he fell like it
+                        .async().waitForTaskToComplete(false)
+                                // from this point forward this is the async route doing its work
+                                // so we do a bit of delay to simulate heavy work that takes time
+                        .to("mock:foo")
+                        .delay(100)
+                                // and we also work with the message so we can prepare a response
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                route += "B";
+                                assertEquals("Hello World", exchange.getIn().getBody());
+                                throw new IllegalArgumentException("Damn forced by unit test");
+                            }
+                            // and we use mocks for unit testing
+                        }).to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java?rev=772598&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java Thu May  7 10:42:40 2009
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import java.util.concurrent.Future;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncRouteTest extends ContextTestSupport {
+
+    private String route = "";
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        route = "";
+    }
+
+    public void testAsyncRoute() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        // send a request reply to the direct start endpoint
+        // it will wait for the async response so we get the full response
+        Object out = template.requestBody("direct:start", "Hello");
+
+        // we should run before the async processor that sets B
+        route += "A";
+
+        // as it turns into a async route later we get a Future as response
+        assertIsInstanceOf(String.class, out);
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals("BA", route);
+
+        // get the response from the future
+        String response = context.getTypeConverter().convertTo(String.class, out);
+        assertEquals("Bye World", response);
+    }
+
+    public void testAsyncRouteWithTypeConverted() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        // send a request reply to the direct start endpoint, but will use
+        // future type converter that will wait for the response
+        String response = template.requestBody("direct:start", "Hello", String.class);
+
+        // we should wait for the async response as we ask for the result as a String body
+        route += "A";
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals("Bye World", response);
+        assertEquals("BA", route);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // we start this route async
+                from("direct:start")
+                        // we play a bit with the message
+                        .transform(body().append(" World"))
+                                // now turn the route into async from this point forward
+                                // the caller will have a Future<Exchange> returned as response in OUT
+                                // to be used to grap the async response when he fell like it
+                        .async()
+                                // from this point forward this is the async route doing its work
+                                // so we do a bit of delay to simulate heavy work that takes time
+                        .to("mock:foo")
+                        .delay(100)
+                                // and we also work with the message so we can prepare a response
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                route += "B";
+                                assertEquals("Hello World", exchange.getIn().getBody());
+                                exchange.getOut().setBody("Bye World");
+                            }
+                            // and we use mocks for unit testing
+                        }).to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date