You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/05/07 12:42:44 UTC
svn commit: r772598 [1/2] - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/ main/java/org/apache/camel/component/seda/
main/java/org/apache/camel/impl/ main/java/org/apache/camel/impl/converter/
main/java/org/apache/camel/model/ main/java/o...
Author: davsclaus
Date: Thu May 7 10:42:40 2009
New Revision: 772598
URL: http://svn.apache.org/viewvc?rev=772598&view=rev
Log:
CAMEL-1572: Added first cut of async DSL. Fixed CS.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformXpathTest.java
- copied, changed from r772297, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformProcessorTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteWithErrorTest.java (with props)
camel/trunk/camel-core/src/test/resources/org/apache/camel/processor/students.xml (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java
camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/RollbackExchangeException.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java?rev=772598&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java Thu May 7 10:42:40 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Exception occured during execution/processing of an {@link Exchange}.
+ * <p/>
+ * Is usually thrown to the caller when using the {@link org.apache.camel.ProducerTemplate}
+ * to send messages to Camel.
+ *
+ * @version $Revision$
+ */
+public class CamelExecutionException extends RuntimeExchangeException {
+
+ public CamelExecutionException(String message, Exchange exchange) {
+ super(message, exchange);
+ }
+
+ public CamelExecutionException(String message, Exchange exchange, Throwable cause) {
+ super(message, exchange, cause);
+ }
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Thu May 7 10:42:40 2009
@@ -30,6 +30,9 @@
*/
public interface Exchange {
+ String ASYNC_WAIT = "CamelAsyncWait";
+ String ASYNC_WAIT_TIMEOUT = "CamelAsyncWaitTimeout";
+
String BEAN_METHOD_NAME = "CamelBeanMethodName";
String BEAN_HOLDER = "CamelBeanHolder";
String BEAN_MULTI_PARAMETER_ARRAY = "CamelBeanMultiParameterArray";
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java Thu May 7 10:42:40 2009
@@ -29,13 +29,13 @@
}
public InvalidPayloadException(Exchange exchange, Class<?> type, Message message) {
- super("No body available of type: " + type.getName()
+ super("No body available of type: " + type.getCanonicalName()
+ NoSuchPropertyException.valueDescription(message.getBody()) + " on: " + message, exchange);
this.type = type;
}
public InvalidPayloadException(Exchange exchange, Class<?> type, Message message, Throwable cause) {
- super("No body available of type: " + type.getName()
+ super("No body available of type: " + type.getCanonicalName()
+ NoSuchPropertyException.valueDescription(message.getBody()) + " on: " + message
+ ". Caused by: " + cause.getMessage(), exchange, cause);
this.type = type;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java Thu May 7 10:42:40 2009
@@ -70,37 +70,53 @@
/**
* Sends the body to the default endpoint
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param body the payload to send
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
void sendBody(Object body);
/**
* Sends the body to the default endpoint with a specified header and header
* value
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param body the payload to send
* @param header the header name
* @param headerValue the header value
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
void sendBodyAndHeader(Object body, String header, Object headerValue);
/**
* Sends the body to the default endpoint with a specified property and property
* value
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param body the payload to send
* @param property the property name
* @param propertyValue the property value
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
void sendBodyAndProperty(Object body, String property, Object propertyValue);
/**
* Sends the body to the default endpoint with the specified headers and
* header values
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param body the payload to send
* @param headers the headers
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
void sendBodyAndHeaders(Object body, Map<String, Object> headers);
@@ -171,65 +187,95 @@
/**
* Send the body to an endpoint
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpoint the endpoint to send the exchange to
* @param body the payload
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
void sendBody(Endpoint endpoint, Object body);
/**
* Send the body to an endpoint
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpointUri the endpoint URI to send the exchange to
* @param body the payload
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
void sendBody(String endpointUri, Object body);
/**
* Send the body to an endpoint with the given {@link ExchangePattern}
* returning any result output body
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpoint the endpoint to send the exchange to
* @param body the payload
* @param pattern the message {@link ExchangePattern} such as
* {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
* @return the result if {@link ExchangePattern} is OUT capable, otherwise <tt>null</tt>
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
Object sendBody(Endpoint endpoint, ExchangePattern pattern, Object body);
/**
* Send the body to an endpoint returning any result output body
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpointUri the endpoint URI to send the exchange to
* @param pattern the message {@link ExchangePattern} such as
* {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
* @param body the payload
* @return the result if {@link ExchangePattern} is OUT capable, otherwise <tt>null</tt>
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
Object sendBody(String endpointUri, ExchangePattern pattern, Object body);
/**
* Sends the body to an endpoint with a specified header and header value
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpointUri the endpoint URI to send to
* @param body the payload to send
* @param header the header name
* @param headerValue the header value
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
void sendBodyAndHeader(String endpointUri, Object body, String header, Object headerValue);
/**
* Sends the body to an endpoint with a specified header and header value
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpoint the Endpoint to send to
* @param body the payload to send
* @param header the header name
* @param headerValue the header value
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
void sendBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue);
/**
* Sends the body to an endpoint with a specified header and header value
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpoint the Endpoint to send to
* @param pattern the message {@link ExchangePattern} such as
@@ -238,12 +284,16 @@
* @param header the header name
* @param headerValue the header value
* @return the result if {@link ExchangePattern} is OUT capable, otherwise <tt>null</tt>
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
Object sendBodyAndHeader(Endpoint endpoint, ExchangePattern pattern, Object body,
String header, Object headerValue);
/**
* Sends the body to an endpoint with a specified header and header value
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpoint the Endpoint URI to send to
* @param pattern the message {@link ExchangePattern} such as
@@ -252,32 +302,44 @@
* @param header the header name
* @param headerValue the header value
* @return the result if {@link ExchangePattern} is OUT capable, otherwise <tt>null</tt>
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
Object sendBodyAndHeader(String endpoint, ExchangePattern pattern, Object body,
String header, Object headerValue);
/**
* Sends the body to an endpoint with a specified property and property value
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpointUri the endpoint URI to send to
* @param body the payload to send
* @param property the property name
* @param propertyValue the property value
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
void sendBodyAndProperty(String endpointUri, Object body, String property, Object propertyValue);
/**
* Sends the body to an endpoint with a specified property and property value
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpoint the Endpoint to send to
* @param body the payload to send
* @param property the property name
* @param propertyValue the property value
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
void sendBodyAndProperty(Endpoint endpoint, Object body, String property, Object propertyValue);
/**
* Sends the body to an endpoint with a specified property and property value
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpoint the Endpoint to send to
* @param pattern the message {@link ExchangePattern} such as
@@ -286,12 +348,16 @@
* @param property the property name
* @param propertyValue the property value
* @return the result if {@link ExchangePattern} is OUT capable, otherwise <tt>null</tt>
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
Object sendBodyAndProperty(Endpoint endpoint, ExchangePattern pattern, Object body,
String property, Object propertyValue);
/**
* Sends the body to an endpoint with a specified property and property value
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpoint the Endpoint URI to send to
* @param pattern the message {@link ExchangePattern} such as
@@ -300,33 +366,42 @@
* @param property the property name
* @param propertyValue the property value
* @return the result if {@link ExchangePattern} is OUT capable, otherwise <tt>null</tt>
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
Object sendBodyAndProperty(String endpoint, ExchangePattern pattern, Object body,
String property, Object propertyValue);
/**
- * Sends the body to an endpoint with the specified headers and header
- * values
+ * Sends the body to an endpoint with the specified headers and header values
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpointUri the endpoint URI to send to
* @param body the payload to send
* @param headers headers
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
void sendBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers);
/**
- * Sends the body to an endpoint with the specified headers and header
- * values
+ * Sends the body to an endpoint with the specified headers and header values
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpoint the endpoint URI to send to
* @param body the payload to send
* @param headers headers
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
void sendBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers);
/**
- * Sends the body to an endpoint with the specified headers and header
- * values
+ * Sends the body to an endpoint with the specified headers and header values
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpointUri the endpoint URI to send to
* @param pattern the message {@link ExchangePattern} such as
@@ -334,13 +409,16 @@
* @param body the payload to send
* @param headers headers
* @return the result if {@link ExchangePattern} is OUT capable, otherwise <tt>null</tt>
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
Object sendBodyAndHeaders(String endpointUri, ExchangePattern pattern, Object body,
Map<String, Object> headers);
/**
- * Sends the body to an endpoint with the specified headers and header
- * values
+ * Sends the body to an endpoint with the specified headers and header values
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpoint the endpoint URI to send to
* @param pattern the message {@link ExchangePattern} such as
@@ -348,6 +426,7 @@
* @param body the payload to send
* @param headers headers
* @return the result if {@link ExchangePattern} is OUT capable, otherwise <tt>null</tt>
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
Object sendBodyAndHeaders(Endpoint endpoint, ExchangePattern pattern, Object body,
Map<String, Object> headers);
@@ -379,79 +458,110 @@
/**
* Sends the body to the default endpoint and returns the result content
* Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param body the payload to send
* @return the result (see class javadoc)
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
Object requestBody(Object body);
/**
* Sends the body to the default endpoint and returns the result content
* Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param body the payload to send
* @param type the expected response type
* @return the result (see class javadoc)
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
<T> T requestBody(Object body, Class<T> type);
/**
* Send the body to an endpoint returning any result output body.
* Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpoint the Endpoint to send to
* @param body the payload
* @return the result (see class javadoc)
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
Object requestBody(Endpoint endpoint, Object body);
/**
* Send the body to an endpoint returning any result output body.
* Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpoint the Endpoint to send to
* @param body the payload
* @param type the expected response type
* @return the result (see class javadoc)
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
<T> T requestBody(Endpoint endpoint, Object body, Class<T> type);
/**
* Send the body to an endpoint returning any result output body.
* Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpointUri the endpoint URI to send to
* @param body the payload
* @return the result (see class javadoc)
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
Object requestBody(String endpointUri, Object body);
/**
* Send the body to an endpoint returning any result output body.
* Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpointUri the endpoint URI to send to
* @param body the payload
* @param type the expected response type
* @return the result (see class javadoc)
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
<T> T requestBody(String endpointUri, Object body, Class<T> type);
/**
* Send the body to an endpoint returning any result output body.
* Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpoint the Endpoint to send to
* @param body the payload
* @param header the header name
* @param headerValue the header value
* @return the result (see class javadoc)
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
Object requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue);
/**
* Send the body to an endpoint returning any result output body.
* Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpoint the Endpoint to send to
* @param body the payload
@@ -459,24 +569,32 @@
* @param headerValue the header value
* @param type the expected response type
* @return the result (see class javadoc)
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
<T> T requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type);
/**
* Send the body to an endpoint returning any result output body.
* Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpointUri the endpoint URI to send to
* @param body the payload
* @param header the header name
* @param headerValue the header value
* @return the result (see class javadoc)
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
Object requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue);
/**
* Send the body to an endpoint returning any result output body.
* Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpointUri the endpoint URI to send to
* @param body the payload
@@ -484,56 +602,69 @@
* @param headerValue the header value
* @param type the expected response type
* @return the result (see class javadoc)
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
<T> T requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type);
/**
- * Sends the body to an endpoint with the specified headers and header
- * values.
+ * Sends the body to an endpoint with the specified headers and header values.
* Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpointUri the endpoint URI to send to
* @param body the payload to send
* @param headers headers
* @return the result (see class javadoc)
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
Object requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers);
/**
- * Sends the body to an endpoint with the specified headers and header
- * values.
+ * Sends the body to an endpoint with the specified headers and header values.
* Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpointUri the endpoint URI to send to
* @param body the payload to send
* @param headers headers
* @param type the expected response type
* @return the result (see class javadoc)
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
<T> T requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type);
/**
- * Sends the body to an endpoint with the specified headers and header
- * values.
+ * Sends the body to an endpoint with the specified headers and header values.
* Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpoint the endpoint URI to send to
* @param body the payload to send
* @param headers headers
* @return the result (see class javadoc)
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
Object requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers);
/**
- * Sends the body to an endpoint with the specified headers and header
- * values.
+ * Sends the body to an endpoint with the specified headers and header values.
* Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param endpoint the endpoint URI to send to
* @param body the payload to send
* @param headers headers
* @param type the expected response type
* @return the result (see class javadoc)
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
<T> T requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type);
@@ -650,15 +781,22 @@
/**
* Gets the response body from the future handle, will wait until the response is ready.
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param future the handle to get the response
* @param type the expected response type
* @return the result (see class javadoc)
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
<T> T asyncExtractBody(Future future, Class<T> type);
/**
* Gets the response body from the future handle, will wait at most the given time for the response to be ready.
+ * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception
+ * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with
+ * the caused exception wrapped.
*
* @param future the handle to get the response
* @param timeout the maximum time to wait
@@ -666,6 +804,7 @@
* @param type the expected response type
* @return the result (see class javadoc)
* @throws java.util.concurrent.TimeoutException if the wait timed out
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
<T> T asyncExtractBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/RollbackExchangeException.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/RollbackExchangeException.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/RollbackExchangeException.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/RollbackExchangeException.java Thu May 7 10:42:40 2009
@@ -21,22 +21,22 @@
*
* @version $Revision$
*/
-public class RollbackExchangeException extends Exception {
+public class RollbackExchangeException extends CamelExchangeException {
public RollbackExchangeException(Exchange exchange) {
- this("Intended rollback on exchange", exchange);
+ this("Intended rollback", exchange);
}
public RollbackExchangeException(Exchange exchange, Throwable cause) {
- this("Intended rollback on exchange", exchange, cause);
+ this("Intended rollback", exchange, cause);
}
public RollbackExchangeException(String message, Exchange exchange) {
- super(message + ": " + exchange);
+ super(message, exchange);
}
public RollbackExchangeException(String message, Exchange exchange, Throwable cause) {
- super(message + ": " + exchange, cause);
+ super(message, exchange, cause);
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Thu May 7 10:42:40 2009
@@ -23,8 +23,8 @@
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java Thu May 7 10:42:40 2009
@@ -19,8 +19,8 @@
import java.net.URI;
import java.util.Iterator;
import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.camel.CamelContext;
import org.apache.camel.Component;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Thu May 7 10:42:40 2009
@@ -16,10 +16,10 @@
*/
package org.apache.camel.impl;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ExecutorService;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java Thu May 7 10:42:40 2009
@@ -25,6 +25,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.camel.CamelExecutionException;
import org.apache.camel.Exchange;
import org.apache.camel.NoFactoryAvailableException;
import org.apache.camel.NoTypeConversionAvailableException;
@@ -68,6 +69,7 @@
addFallbackTypeConverter(new EnumTypeConverter());
addFallbackTypeConverter(new ArrayTypeConverter());
addFallbackTypeConverter(new PropertyEditorTypeConverter());
+ addFallbackTypeConverter(new FutureTypeConverter(this));
}
public List<TypeConverterLoader> getTypeConverterLoaders() {
@@ -82,6 +84,9 @@
Object answer;
try {
answer = doConvertTo(type, exchange, value);
+ } catch (CamelExecutionException e) {
+ // rethrow exception exception as its not due to failed convertion
+ throw e;
} catch (Exception e) {
// we cannot convert so return null
if (LOG.isDebugEnabled()) {
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java?rev=772598&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java Thu May 7 10:42:40 2009
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.converter;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.StreamCache;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Future type converter.
+ *
+ * @version $Revision$
+ */
+@Converter
+public final class FutureTypeConverter implements TypeConverter {
+
+ private static final Log LOG = LogFactory.getLog(FutureTypeConverter.class);
+
+ private final TypeConverter converter;
+
+ public FutureTypeConverter(TypeConverter converter) {
+ this.converter = converter;
+ }
+
+ private <T> T doConvertTo(Class<T> type, Exchange exchange, Object value) throws Exception {
+ // do not convert to stream cache
+ if (StreamCache.class.isAssignableFrom(value.getClass())) {
+ return null;
+ }
+
+ if (Future.class.isAssignableFrom(value.getClass())) {
+
+ Future future = (Future) value;
+
+ if (future.isCancelled()) {
+ return null;
+ }
+
+ // do some trace logging as the get is blocking until the response is ready
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Getting future response");
+ }
+
+ Object body;
+ try {
+ body = future.get();
+ } catch (ExecutionException e) {
+ exchange.setException(e);
+ throw e;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Got future response");
+ }
+
+ if (body == null) {
+ return null;
+ }
+
+ Class from = body.getClass();
+
+ // maybe from is already the type we want
+ if (from.isAssignableFrom(type)) {
+ return type.cast(from);
+ } else if (body instanceof Exchange) {
+ Exchange result = (Exchange) body;
+ body = ExchangeHelper.extractResultBody(result, result.getPattern());
+ }
+
+ // no then try to lookup a type converter
+ return converter.convertTo(type, exchange, body);
+ }
+
+ return null;
+ }
+
+ public <T> T convertTo(Class<T> type, Object value) {
+ return convertTo(type, null, value);
+ }
+
+ public <T> T convertTo(Class<T> type, Exchange exchange, Object value) {
+ try {
+ return doConvertTo(type, exchange, value);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ public <T> T mandatoryConvertTo(Class<T> type, Object value) throws NoTypeConversionAvailableException {
+ return mandatoryConvertTo(type, null, value);
+ }
+
+ public <T> T mandatoryConvertTo(Class<T> type, Exchange exchange, Object value) throws NoTypeConversionAvailableException {
+ T answer;
+ try {
+ answer = doConvertTo(type, exchange, value);
+ } catch (Exception e) {
+ throw new NoTypeConversionAvailableException(value, type, e);
+ }
+
+ if (answer == null) {
+ throw new NoTypeConversionAvailableException(value, type);
+ }
+
+ return answer;
+ }
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java?rev=772598&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java Thu May 7 10:42:40 2009
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import java.util.concurrent.ExecutorService;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.Processor;
+import org.apache.camel.processor.AsyncProcessor;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+
+/**
+ * Represents an XML <async/> element
+ *
+ * @version $Revision$
+ */
+@XmlRootElement(name = "async")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class AsyncDefinition extends OutputDefinition<ProcessorDefinition> {
+
+ @XmlTransient
+ private ExecutorService executorService;
+ @XmlAttribute(required = false)
+ private String executorServiceRef;
+ @XmlAttribute(required = false)
+ private Integer poolSize;
+ @XmlAttribute(required = false)
+ private Boolean waitForTaskToComplete = Boolean.TRUE;
+
+ @Override
+ public Processor createProcessor(RouteContext routeContext) throws Exception {
+ if (executorServiceRef != null) {
+ executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
+ }
+ if (executorService == null && poolSize != null) {
+ executorService = ExecutorServiceHelper.newScheduledThreadPool(poolSize, "AsyncProcessor", true);
+ }
+ Processor childProcessor = routeContext.createProcessor(this);
+ return new AsyncProcessor(childProcessor, executorService, waitForTaskToComplete);
+ }
+
+ @Override
+ public String getLabel() {
+ return "async";
+ }
+
+ @Override
+ public String getShortName() {
+ return "async";
+ }
+
+ @Override
+ public String toString() {
+ return "Async[" + getOutputs() + "]";
+ }
+
+ /**
+ * Setting the executor service for executing the multicasting action.
+ *
+ * @return the builder
+ */
+ public AsyncDefinition executorService(ExecutorService executorService) {
+ setExecutorService(executorService);
+ return this;
+ }
+
+ /**
+ * Setting the core pool size for the underlying {@link java.util.concurrent.ExecutorService}.
+ *
+ * @return the builder
+ */
+ public AsyncDefinition poolSize(int poolSize) {
+ setPoolSize(poolSize);
+ return this;
+ }
+
+ /**
+ * Setting to whether to wait for async tasks to be complete before continuing original route.
+ * <p/>
+ * Is default <tt>true</tt>
+ *
+ * @param complete whether to wait or not
+ * @return the builder
+ */
+ public AsyncDefinition waitForTaskToComplete(boolean complete) {
+ setWaitForTaskToComplete(complete);
+ return this;
+ }
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ public Integer getPoolSize() {
+ return poolSize;
+ }
+
+ public void setPoolSize(Integer poolSize) {
+ this.poolSize = poolSize;
+ }
+
+ public Boolean getWaitForTaskToComplete() {
+ return waitForTaskToComplete;
+ }
+
+ public void setWaitForTaskToComplete(Boolean waitForTaskToComplete) {
+ this.waitForTaskToComplete = waitForTaskToComplete;
+ }
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Thu May 7 10:42:40 2009
@@ -670,6 +670,33 @@
}
/**
+ * Breaks the route into asynchronous. The caller thread will end and the OUT message will
+ * contain a {@link java.util.concurrent.Future} handle so you can get the real response
+ * later using this handle.
+ *
+ * @return the builder
+ */
+ public AsyncDefinition async() {
+ AsyncDefinition answer = new AsyncDefinition();
+ addOutput(answer);
+ return answer;
+ }
+
+ /**
+ * Breaks the route into asynchronous. The caller thread will end and the OUT message will
+ * contain a {@link java.util.concurrent.Future} handle so you can get the real response
+ * later using this handle.
+ *
+ * @param poolSize the core pool size
+ * @return the builder
+ */
+ public AsyncDefinition async(int poolSize) {
+ AsyncDefinition answer = async();
+ answer.setPoolSize(poolSize);
+ return answer;
+ }
+
+ /**
* Ends the current block
*
* @return the builder
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java?rev=772598&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java Thu May 7 10:42:40 2009
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+
+/**
+ * Async processor that turns the processing going forward into async mode.
+ * <p/>
+ * The original caller thread will receive a <tt>Future<Exchange></tt> in the OUT message body.
+ * It can then later use this handle to obtain the async response.
+ * <p/>
+ * Camel also provides type converters so you can just ask to get the desired object type and Camel
+ * will automatic wait for the async task to complete to return the response.
+ *
+ * @version $Revision$
+ */
+public class AsyncProcessor extends DelegateProcessor implements Processor {
+
+ private static final int DEFAULT_THREADPOOL_SIZE = 5;
+ private ExecutorService executorService;
+ private boolean waitTaskComplete;
+
+ public AsyncProcessor(Processor output, ExecutorService executorService, boolean waitTaskComplete) {
+ super(output);
+ this.executorService = executorService;
+ this.waitTaskComplete = waitTaskComplete;
+ }
+
+ public void process(final Exchange exchange) throws Exception {
+ final Processor output = getProcessor();
+ if (output == null) {
+ // no output then return
+ return;
+ }
+
+ // use a new copy of the exchange to route async
+ final Exchange copy = exchange.newCopy();
+
+ // let it execute async and return the Future
+ Callable<Exchange> task = new Callable<Exchange>() {
+ public Exchange call() throws Exception {
+ // must use a copy of the original exchange for processing async
+ output.process(copy);
+ return copy;
+ }
+ };
+
+ // sumbit the task
+ Future<Exchange> future = getExecutorService().submit(task);
+
+ // TODO: Support exchange headers for wait and timeout values, see Exchange constants
+
+ if (waitTaskComplete) {
+ // wait for task to complete
+ Exchange response = future.get();
+ // if we are out capable then set the response on the original exchange
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ ExchangeHelper.copyResults(exchange, response);
+ }
+ } else {
+ // no we do not expect a reply so lets continue, set a handle to the future task
+ // in case end user need it later
+ exchange.getOut().setBody(future);
+ }
+ }
+
+ public ExecutorService getExecutorService() {
+ if (executorService == null) {
+ executorService = createExecutorService();
+ }
+ return executorService;
+ }
+
+ private ExecutorService createExecutorService() {
+ return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "AsyncProcessor", true);
+ }
+
+ protected void doStop() throws Exception {
+ super.doStop();
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+ }
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java Thu May 7 10:42:40 2009
@@ -20,6 +20,7 @@
import java.util.Map;
import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExecutionException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
@@ -31,7 +32,6 @@
import org.apache.camel.NoSuchPropertyException;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.TypeConverter;
-import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
/**
* Some helper methods for working with {@link Exchange} objects
@@ -397,21 +397,22 @@
}
/**
- * Extracts the body from the given result.
+ * Extracts the body from the given exchange.
* <p/>
* If the exchange pattern is provided it will try to honor it and retrive the body
* from either IN or OUT according to the pattern.
*
- * @param exchange the result
- * @param pattern exchange pattern if given, can be <tt>null</tt>
- * @return the result, can be <tt>null</tt>.
+ * @param exchange the exchange
+ * @param pattern exchange pattern if given, can be <tt>null</tt>
+ * @return the result body, can be <tt>null</tt>.
+ * @throws CamelExecutionException if the processing of the exchange failed
*/
public static Object extractResultBody(Exchange exchange, ExchangePattern pattern) {
Object answer = null;
if (exchange != null) {
// rethrow if there was an exception
if (exchange.getException() != null) {
- throw wrapRuntimeCamelException(exchange.getException());
+ throw new CamelExecutionException("Exception occured during execution ", exchange, exchange.getException());
}
// result could have a fault message
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Thu May 7 10:42:40 2009
@@ -18,8 +18,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
/**
* Helper for {@link java.util.concurrent.ExecutorService} to construct executors using a thread factory that
@@ -38,7 +38,7 @@
* Creates a new thread name with the given prefix
*/
protected static String getThreadName(String name) {
- return "Camel " + name + " thread:" + nextThreadCounter();
+ return "Camel thread " + nextThreadCounter() + ": " + name;
}
protected static synchronized int nextThreadCounter() {
Modified: camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index?rev=772598&r1=772597&r2=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index (original)
+++ camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index Thu May 7 10:42:40 2009
@@ -15,6 +15,7 @@
## limitations under the License.
## ------------------------------------------------------------------------
AggregateDefinition
+AsyncDefinition
BeanDefinition
CatchDefinition
ChoiceDefinition
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformXpathTest.java (from r772297, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformProcessorTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformXpathTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformXpathTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformProcessorTest.java&r1=772297&r2=772598&rev=772598&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformXpathTest.java Thu May 7 10:42:40 2009
@@ -16,17 +16,44 @@
*/
package org.apache.camel.processor;
+import java.io.File;
+
+import org.w3c.dom.NodeList;
+
+import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Based on user forum trouble
+ */
+public class TransformXpathTest extends ContextTestSupport {
+
+ public void testTransformWithXpath() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+ mock.message(0).body().isInstanceOf(NodeList.class);
-public class TransformProcessorTest extends TransformViaDSLTest {
+ String xml = context.getTypeConverter().convertTo(String.class, new File("src/test/resources/org/apache/camel/processor/students.xml"));
+
+ template.sendBody("direct:start", xml);
+
+ assertMockEndpointsSatisfied();
+
+ NodeList list = mock.getReceivedExchanges().get(0).getIn().getBody(NodeList.class);
+ assertEquals(2, list.getLength());
+
+ assertEquals("Claus", context.getTypeConverter().convertTo(String.class, list.item(0).getTextContent().trim()));
+ assertEquals("Hadrian", context.getTypeConverter().convertTo(String.class, list.item(1).getTextContent().trim()));
+ }
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- // START SNIPPET: example
- from("direct:start").transform(body().append(" World!")).to("mock:result");
- // END SNIPPET: example
+ from("direct:start")
+ .transform().xpath("//students/student")
+ .to("mock:result");
}
};
}
-}
+}
\ No newline at end of file
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java?rev=772598&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java Thu May 7 10:42:40 2009
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test to verify that error handling using async() also works as expected.
+ *
+ * @version $Revision$
+ */
+public class AsyncErrorHandlerTest extends ContextTestSupport {
+
+ public void testAsyncErrorHandler() throws Exception {
+ getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+
+ MockEndpoint mock = getMockEndpoint("mock:dead");
+ mock.expectedMessageCount(1);
+ mock.message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+ mock.message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+
+ template.sendBody("direct:in", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(2).delay(0).logStackTrace(false));
+
+ from("direct:in")
+ .async(2)
+ .to("mock:foo")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ throw new Exception("Forced exception by unit test");
+ }
+ });
+ }
+ };
+ }
+
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java?rev=772598&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java Thu May 7 10:42:40 2009
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import java.util.concurrent.Future;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncNoWaitRouteTest extends ContextTestSupport {
+
+ private String route = "";
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ route = "";
+ }
+
+ public void testAsyncNoWaitRouteExchange() throws Exception {
+ getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+ // send an in out to the direct endpoint using the classic API
+ Exchange exchange = template.send("direct:start", new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.setPattern(ExchangePattern.InOut);
+ exchange.getIn().setBody("Hello");
+ }
+ });
+
+ // we should run before the async processor that sets B
+ route += "A";
+
+ // as it turns into a async route later we get a Future in the IN body
+ Object out = exchange.getOut().getBody();
+ assertIsInstanceOf(Future.class, out);
+
+ // cast to future
+ Future future = (Future) out;
+
+ assertFalse("Should not be done", future.isDone());
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals("AB", route);
+
+ // get the response from the future
+ String response = context.getTypeConverter().convertTo(String.class, future);
+ assertEquals("Bye World", response);
+ }
+
+ public void testAsyncNoWaitRoute() throws Exception {
+ getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+ // send a request reply to the direct start endpoint
+ Object out = template.requestBody("direct:start", "Hello");
+
+ // we should run before the async processor that sets B
+ route += "A";
+
+ // as it turns into a async route later we get a Future as response
+ assertIsInstanceOf(Future.class, out);
+
+ // cast to future
+ Future future = (Future) out;
+
+ assertFalse("Should not be done", future.isDone());
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals("AB", route);
+
+ // get the response from the future
+ String response = context.getTypeConverter().convertTo(String.class, future);
+ assertEquals("Bye World", response);
+ }
+
+ public void testAsyncRouteNoWaitWithTypeConverted() throws Exception {
+ getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+ // send a request reply to the direct start endpoint, but will use
+ // future type converter that will wait for the response, even though the async
+ // is set to not wait. As the type converter will wait for us
+ String response = template.requestBody("direct:start", "Hello", String.class);
+
+ // we should wait for the async response as we ask for the result as a String body
+ route += "A";
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals("Bye World", response);
+ assertEquals("BA", route);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // we start this route async
+ from("direct:start")
+ // we play a bit with the message
+ .transform(body().append(" World"))
+ // now turn the route into async from this point forward
+ // the caller will have a Future<Exchange> returned as response in OUT
+ // to be used to grap the async response when he fell like it
+ // we do not want to wait for tasks to be complete so we instruct Camel
+ // to not wait, and therefore Camel returns the Future<Exchange> handle we
+ // can use to get the result when we want
+ .async().waitForTaskToComplete(false)
+ // from this point forward this is the async route doing its work
+ // so we do a bit of delay to simulate heavy work that takes time
+ .to("mock:foo")
+ .delay(100)
+ // and we also work with the message so we can prepare a response
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ route += "B";
+ assertEquals("Hello World", exchange.getIn().getBody());
+ exchange.getOut().setBody("Bye World");
+ }
+ // and we use mocks for unit testing
+ }).to("mock:result");
+ }
+ };
+ }
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java?rev=772598&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java Thu May 7 10:42:40 2009
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import java.util.concurrent.Future;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncRouteNoWaitWithErrorTest extends ContextTestSupport {
+
+ private String route = "";
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ route = "";
+ }
+
+ public void testAsyncRouteWithError() throws Exception {
+ getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+
+ // send a request reply to the direct start endpoint
+ Object out = template.requestBody("direct:start", "Hello");
+
+ // we should run before the async processor that sets B
+ route += "A";
+
+ // as it turns into a async route later we get a Future as response
+ assertIsInstanceOf(Future.class, out);
+
+ // cast to future
+ Future future = (Future) out;
+
+ assertFalse("Should not be done", future.isDone());
+
+ assertMockEndpointsSatisfied();
+
+ // get the response from the future
+ try {
+ String response = context.getTypeConverter().convertTo(String.class, future);
+ fail("Should have thrown an exception");
+ } catch (CamelExecutionException e) {
+ // expected an execution exception
+ assertEquals("Damn forced by unit test", e.getCause().getMessage());
+ }
+
+ assertEquals("AB", route);
+ }
+
+ public void testAsyncRouteWithTypeConverted() throws Exception {
+ getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+
+ // send a request reply to the direct start endpoint, but will use
+ // future type converter that will wait for the response
+ try {
+ String response = template.requestBody("direct:start", "Hello", String.class);
+ fail("Should have thrown an exception");
+ } catch (CamelExecutionException e) {
+ // expected an execution exception
+ assertEquals("Damn forced by unit test", e.getCause().getMessage());
+ }
+
+ // we should wait for the async response as we ask for the result as a String body
+ route += "A";
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals("BA", route);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // we start this route async
+ from("direct:start")
+ // we play a bit with the message
+ .transform(body().append(" World"))
+ // now turn the route into async from this point forward
+ // the caller will have a Future<Exchange> returned as response in OUT
+ // to be used to grap the async response when he fell like it
+ .async().waitForTaskToComplete(false)
+ // from this point forward this is the async route doing its work
+ // so we do a bit of delay to simulate heavy work that takes time
+ .to("mock:foo")
+ .delay(100)
+ // and we also work with the message so we can prepare a response
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ route += "B";
+ assertEquals("Hello World", exchange.getIn().getBody());
+ throw new IllegalArgumentException("Damn forced by unit test");
+ }
+ // and we use mocks for unit testing
+ }).to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java?rev=772598&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java Thu May 7 10:42:40 2009
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import java.util.concurrent.Future;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncRouteTest extends ContextTestSupport {
+
+ private String route = "";
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ route = "";
+ }
+
+ public void testAsyncRoute() throws Exception {
+ getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+ // send a request reply to the direct start endpoint
+ // it will wait for the async response so we get the full response
+ Object out = template.requestBody("direct:start", "Hello");
+
+ // we should run before the async processor that sets B
+ route += "A";
+
+ // as it turns into a async route later we get a Future as response
+ assertIsInstanceOf(String.class, out);
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals("BA", route);
+
+ // get the response from the future
+ String response = context.getTypeConverter().convertTo(String.class, out);
+ assertEquals("Bye World", response);
+ }
+
+ public void testAsyncRouteWithTypeConverted() throws Exception {
+ getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+ // send a request reply to the direct start endpoint, but will use
+ // future type converter that will wait for the response
+ String response = template.requestBody("direct:start", "Hello", String.class);
+
+ // we should wait for the async response as we ask for the result as a String body
+ route += "A";
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals("Bye World", response);
+ assertEquals("BA", route);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // we start this route async
+ from("direct:start")
+ // we play a bit with the message
+ .transform(body().append(" World"))
+ // now turn the route into async from this point forward
+ // the caller will have a Future<Exchange> returned as response in OUT
+ // to be used to grap the async response when he fell like it
+ .async()
+ // from this point forward this is the async route doing its work
+ // so we do a bit of delay to simulate heavy work that takes time
+ .to("mock:foo")
+ .delay(100)
+ // and we also work with the message so we can prepare a response
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ route += "B";
+ assertEquals("Hello World", exchange.getIn().getBody());
+ exchange.getOut().setBody("Bye World");
+ }
+ // and we use mocks for unit testing
+ }).to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date