You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/05/06 09:08:16 UTC
svn commit: r772076 [1/2] - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/component/seda/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/impl/converter/ ...
Author: davsclaus
Date: Wed May 6 07:08:14 2009
New Revision: 772076
URL: http://svn.apache.org/viewvc?rev=772076&view=rev
Log:
CAMEL-1572: First cut of new Async API. AsyncProcessor is @deprecated. Only MulticastProcessor needs to be migrated. Thread DSL will be replaced with a new Async DSL later, hence why its removed.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProducerTest.java
- copied, changed from r771551, camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java (with props)
Removed:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue794Test.java
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue901Test.java
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/ThreadErrorHandlerTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadSetErrorHandlerTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SThreadDefinition.scala
camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/ThreadTest.scala
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java
camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/HeaderFilterStrategyAware.java
camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineConcurrentTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java
camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatDataSetTest.java
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/MultiThreadedHttpGetTest.java
camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java
camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java
camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala
camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SAbstractDefinition.scala
camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/builder/RouteBuilder.scala
camel/trunk/components/camel-test/src/main/java/org/apache/camel/test/TestSupport.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java Wed May 6 07:08:14 2009
@@ -21,7 +21,7 @@
* The callback interface for an {@link AsyncProcessor} so that it can
* notify you when an {@link Exchange} has completed.
*
- * @deprecated a new async API is planned for Camel 2.0
+ * @deprecated a new async API is planned for Camel 2.0, will be removed in Camel 2.0 GA
*/
public interface AsyncCallback {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java Wed May 6 07:08:14 2009
@@ -20,13 +20,10 @@
/**
* A more complex version of {@link Processor} which supports asynchronous
- * processing of the {@link Exchange}. Any processor can be coerced to
- * have an {@link AsyncProcessor} interface by using the
- * {@link org.apache.camel.impl.converter.AsyncProcessorTypeConverter#convert AsyncProcessorTypeConverter.covert}
- * method.
+ * processing of the {@link Exchange}.
*
* @version $Revision$
- * @deprecated a new async API is planned for Camel 2.0
+ * @deprecated a new async API is planned for Camel 2.0, will be removed in Camel 2.0 GA
*/
public interface AsyncProcessor extends Processor {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Wed May 6 07:08:14 2009
@@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.model.RouteDefinition;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Wed May 6 07:08:14 2009
@@ -157,9 +157,7 @@
/**
* Returns the outbound message, lazily creating one if one has not already
- * been associated with this exchange. If you want to inspect this property
- * but not force lazy creation then invoke the {@link #getOut(boolean)}
- * method passing in <tt>false</tt>
+ * been associated with this exchange.
* <p/>
* If you want to test whether an OUT message have been set or not, use the {@link #hasOut()} method.
*
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/HeaderFilterStrategyAware.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/HeaderFilterStrategyAware.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/HeaderFilterStrategyAware.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/HeaderFilterStrategyAware.java Wed May 6 07:08:14 2009
@@ -27,6 +27,8 @@
*/
public interface HeaderFilterStrategyAware {
+ // TODO move this to SPI package
+
HeaderFilterStrategy getHeaderFilterStrategy();
void setHeaderFilterStrategy(HeaderFilterStrategy strategy);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java Wed May 6 07:08:14 2009
@@ -17,6 +17,10 @@
package org.apache.camel;
import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ExecutorService;
/**
* Template (named like Spring's TransactionTemplate & JmsTemplate
@@ -44,6 +48,9 @@
*/
public interface ProducerTemplate extends Service {
+ // Synchronous methods
+ // -----------------------------------------------------------------------
+
/**
* Sends the exchange to the default endpoint
*
@@ -132,18 +139,6 @@
Exchange send(String endpointUri, ExchangePattern pattern, Processor processor);
/**
- * Sends an exchange to an endpoint using a supplied processor
- *
- * @param endpointUri the endpoint URI to send the exchange to
- * @param processor the transformer used to populate the new exchange
- * {@link Processor} to populate the exchange.
- * @param callback the callback will be called when the exchange is completed.
- * @return the returned exchange
- * @deprecated a new async API is planned for Camel 2.0
- */
- Exchange send(String endpointUri, Processor processor, AsyncCallback callback);
-
- /**
* Sends the exchange to the given endpoint
*
* @param endpoint the endpoint to send the exchange to
@@ -175,18 +170,6 @@
Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor);
/**
- * Sends an exchange to an endpoint using a supplied processor
- *
- * @param endpoint the endpoint to send the exchange to
- * @param processor the transformer used to populate the new exchange
- * {@link Processor} to populate the exchange.
- * @param callback the callback will be called when the exchange is completed.
- * @return the returned exchange
- * @deprecated a new async API is planned for Camel 2.0
- */
- Exchange send(Endpoint endpoint, Processor processor, AsyncCallback callback);
-
- /**
* Send the body to an endpoint
*
* @param endpoint the endpoint to send the exchange to
@@ -407,6 +390,7 @@
* Uses an {@link ExchangePattern#InOut} message exchange pattern.
*
* @param body the payload to send
+ * @param type the expected response type
* @return the result (see class javadoc)
*/
<T> T requestBody(Object body, Class<T> type);
@@ -552,4 +536,137 @@
* @return the result (see class javadoc)
*/
<T> T requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type);
+
+
+ // Asynchronous methods
+ // -----------------------------------------------------------------------
+
+ /**
+ * Sets the executor service to use for async messaging.
+ * <p/>
+ * If none provided Camel will default use a {@link java.util.concurrent.ScheduledExecutorService}
+ * with a pool of 5 threads.
+ *
+ * @param executorService the executor service.
+ */
+ void setExecutorService(ExecutorService executorService);
+
+ /**
+ * Sends an asynchronous exchange to the given endpoint.
+ *
+ * @param endpointUri the endpoint URI to send the exchange to
+ * @param exchange the exchange to send
+ * @return a handle to be used to get the response in the future
+ */
+ Future<Exchange> asyncSend(String endpointUri, Exchange exchange);
+
+ /**
+ * Sends an asynchronous exchange to the given endpoint.
+ *
+ * @param endpointUri the endpoint URI to send the exchange to
+ * @param processor the transformer used to populate the new exchange
+ * @return a handle to be used to get the response in the future
+ */
+ Future<Exchange> asyncSend(String endpointUri, Processor processor);
+
+ /**
+ * Sends an asynchronous body to the given endpoint.
+ * Uses an {@link ExchangePattern#InOnly} message exchange pattern.
+ *
+ * @param endpointUri the endpoint URI to send the exchange to
+ * @param body the body to send
+ * @return a handle to be used to get the response in the future
+ */
+ Future<Object> asyncSendBody(String endpointUri, Object body);
+
+ /**
+ * Sends an asynchronous body to the given endpoint.
+ * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ *
+ * @param endpointUri the endpoint URI to send the exchange to
+ * @param body the body to send
+ * @return a handle to be used to get the response in the future
+ */
+ Future<Object> asyncRequestBody(String endpointUri, Object body);
+
+ /**
+ * Sends an asynchronous body to the given endpoint.
+ * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ *
+ * @param endpointUri the endpoint URI to send the exchange to
+ * @param body the body to send
+ * @param header the header name
+ * @param headerValue the header value
+ * @return a handle to be used to get the response in the future
+ */
+ Future<Object> asyncRequestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue);
+
+ /**
+ * Sends an asynchronous body to the given endpoint.
+ * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ *
+ * @param endpointUri the endpoint URI to send the exchange to
+ * @param body the body to send
+ * @param headers headers
+ * @return a handle to be used to get the response in the future
+ */
+ Future<Object> asyncRequestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers);
+
+ /**
+ * Sends an asynchronous body to the given endpoint.
+ * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ *
+ * @param endpointUri the endpoint URI to send the exchange to
+ * @param body the body to send
+ * @param type the expected response type
+ * @return a handle to be used to get the response in the future
+ */
+ <T> Future<T> asyncRequestBody(String endpointUri, Object body, Class<T> type);
+
+ /**
+ * Sends an asynchronous body to the given endpoint.
+ * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ *
+ * @param endpointUri the endpoint URI to send the exchange to
+ * @param body the body to send
+ * @param header the header name
+ * @param headerValue the header value
+ * @param type the expected response type
+ * @return a handle to be used to get the response in the future
+ */
+ <T> Future<T> asyncRequestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type);
+
+ /**
+ * Sends an asynchronous body to the given endpoint.
+ * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+ *
+ * @param endpointUri the endpoint URI to send the exchange to
+ * @param body the body to send
+ * @param headers headers
+ * @param type the expected response type
+ * @return a handle to be used to get the response in the future
+ */
+ <T> Future<T> asyncRequestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type);
+
+ /**
+ * Gets the response body from the future handle, will wait until the response is ready.
+ *
+ * @param future the handle to get the response
+ * @param type the expected response type
+ * @return the result (see class javadoc)
+ */
+ <T> T asyncExtractBody(Future future, Class<T> type);
+
+ /**
+ * Gets the response body from the future handle, will wait at most the given time for the response to be ready.
+ *
+ * @param future the handle to get the response
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @param type the expected response type
+ * @return the result (see class javadoc)
+ * @throws java.util.concurrent.TimeoutException if the wait timed out
+ */
+ <T> T asyncExtractBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException;
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java Wed May 6 07:08:14 2009
@@ -18,10 +18,9 @@
import java.util.Collection;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultProducer;
/**
@@ -29,7 +28,7 @@
*
* @version $Revision$
*/
-public class CollectionProducer extends DefaultProducer implements AsyncProcessor {
+public class CollectionProducer extends DefaultProducer implements Processor {
private final Collection<Exchange> queue;
public CollectionProducer(Endpoint endpoint, Collection<Exchange> queue) {
@@ -41,11 +40,4 @@
queue.add(exchange.copy());
}
- public boolean process(Exchange exchange, AsyncCallback callback) {
- Exchange copy = exchange.copy();
- copy.setProperty("CamelAsyncCallback", callback);
- queue.add(copy);
- callback.done(true);
- return true;
- }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Wed May 6 07:08:14 2009
@@ -22,13 +22,10 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.ServiceSupport;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -41,17 +38,17 @@
private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class);
private SedaEndpoint endpoint;
- private AsyncProcessor processor;
+ private Processor processor;
private ExecutorService executor;
public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
this.endpoint = endpoint;
- this.processor = AsyncProcessorTypeConverter.convert(processor);
+ this.processor = processor;
}
@Override
public String toString() {
- return "SedaConsumer: " + endpoint.getEndpointUri();
+ return "SedaConsumer[" + endpoint.getEndpointUri() + "]";
}
public void run() {
@@ -71,14 +68,6 @@
} catch (Exception e) {
LOG.error("Seda queue caught: " + e, e);
}
-
- // TODO: It ought to be UnitOfWork that did the callback notification but we are planning
- // to replace it with a brand new Async API so we leave it as is
- AsyncCallback callback = exchange.getProperty("CamelAsyncCallback", AsyncCallback.class);
- if (callback != null) {
- // seda consumer is async so invoke the async done on the callback if its provided
- callback.done(false);
- }
} else {
LOG.warn("This consumer is stopped during polling an exchange, so putting it back on the seda queue: " + exchange);
try {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java Wed May 6 07:08:14 2009
@@ -16,11 +16,9 @@
*/
package org.apache.camel.impl;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.ServiceHelper;
@@ -32,7 +30,6 @@
public class DefaultConsumer extends ServiceSupport implements Consumer {
private final Endpoint endpoint;
private final Processor processor;
- private AsyncProcessor asyncProcessor;
private ExceptionHandler exceptionHandler;
public DefaultConsumer(Endpoint endpoint, Processor processor) {
@@ -42,7 +39,7 @@
@Override
public String toString() {
- return "Consumer on " + endpoint;
+ return "Consumer[" + endpoint.getEndpointUri() + "]";
}
public Endpoint getEndpoint() {
@@ -53,19 +50,6 @@
return processor;
}
- /**
- * Provides an {@link AsyncProcessor} interface to the configured
- * processor on the consumer. If the processor does not implement
- * the interface, it will be adapted so that it does.
- * @deprecated
- */
- public AsyncProcessor getAsyncProcessor() {
- if (asyncProcessor == null) {
- asyncProcessor = AsyncProcessorTypeConverter.convert(processor);
- }
- return asyncProcessor;
- }
-
public ExceptionHandler getExceptionHandler() {
if (exceptionHandler == null) {
exceptionHandler = new LoggingExceptionHandler(getClass());
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Wed May 6 07:08:14 2009
@@ -18,8 +18,14 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
-import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -30,8 +36,8 @@
import org.apache.camel.Producer;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.util.CamelContextHelper;
-import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
/**
* A client helper object (named like Spring's TransactionTemplate & JmsTemplate
@@ -47,9 +53,16 @@
private final Map<String, Endpoint> endpointCache = new HashMap<String, Endpoint>();
private boolean useEndpointCache = true;
private Endpoint defaultEndpoint;
-
+ private ExecutorService executor;
+
public DefaultProducerTemplate(CamelContext context) {
this.context = context;
+ this.executor = new ScheduledThreadPoolExecutor(5);
+ }
+
+ public DefaultProducerTemplate(CamelContext context, ExecutorService executor) {
+ this.context = context;
+ this.executor = executor;
}
public DefaultProducerTemplate(CamelContext context, Endpoint defaultEndpoint) {
@@ -60,7 +73,7 @@
public static DefaultProducerTemplate newInstance(CamelContext camelContext, String defaultEndpointUri) {
Endpoint endpoint = CamelContextHelper.getMandatoryEndpoint(camelContext, defaultEndpointUri);
return new DefaultProducerTemplate(camelContext, endpoint);
- }
+ }
public Exchange send(String endpointUri, Exchange exchange) {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
@@ -72,11 +85,6 @@
return send(endpoint, processor);
}
- public Exchange send(String endpointUri, Processor processor, AsyncCallback callback) {
- Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
- return send(endpoint, processor, callback);
- }
-
public Exchange send(String endpointUri, ExchangePattern pattern, Processor processor) {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
return send(endpoint, pattern, processor);
@@ -91,10 +99,6 @@
return producerCache.send(endpoint, processor);
}
- public Exchange send(Endpoint endpoint, Processor processor, AsyncCallback callback) {
- return producerCache.send(endpoint, processor, callback);
- }
-
public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
return producerCache.send(endpoint, pattern, processor);
}
@@ -450,6 +454,7 @@
protected void doStop() throws Exception {
producerCache.stop();
endpointCache.clear();
+ executor.shutdown();
}
protected Object extractResultBody(Exchange result) {
@@ -460,4 +465,136 @@
return ExchangeHelper.extractResultBody(result, pattern);
}
+ public void setExecutorService(ExecutorService executorService) {
+ this.executor = executorService;
+ }
+
+ public Future<Exchange> asyncSend(final String uri, final Exchange exchange) {
+ Callable<Exchange> task = new Callable<Exchange>() {
+ public Exchange call() throws Exception {
+ return send(uri, exchange);
+ }
+ };
+
+ return executor.submit(task);
+ }
+
+ public Future<Exchange> asyncSend(final String uri, final Processor processor) {
+ Callable<Exchange> task = new Callable<Exchange>() {
+ public Exchange call() throws Exception {
+ return send(uri, processor);
+ }
+ };
+
+ return executor.submit(task);
+ }
+
+ public Future<Object> asyncSendBody(final String uri, final Object body) {
+ Callable<Object> task = new Callable<Object>() {
+ public Object call() throws Exception {
+ sendBody(uri, body);
+ // its InOnly, so no body to return
+ return null;
+ }
+ };
+
+ return executor.submit(task);
+ }
+
+ public Future<Object> asyncRequestBody(final String uri, final Object body) {
+ Callable<Object> task = new Callable<Object>() {
+ public Object call() throws Exception {
+ return requestBody(uri, body);
+ }
+ };
+
+ return executor.submit(task);
+ }
+
+ public <T> Future<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) {
+ Callable<T> task = new Callable<T>() {
+ public T call() throws Exception {
+ return requestBody(uri, body, type);
+ }
+ };
+
+ return executor.submit(task);
+ }
+
+ public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) {
+ Callable<Object> task = new Callable<Object>() {
+ public Object call() throws Exception {
+ return requestBodyAndHeader(endpointUri, body, header, headerValue);
+ }
+ };
+
+ return executor.submit(task);
+ }
+
+ public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) {
+ Callable<T> task = new Callable<T>() {
+ public T call() throws Exception {
+ return requestBodyAndHeader(endpointUri, body, header, headerValue, type);
+ }
+ };
+
+ return executor.submit(task);
+ }
+
+ public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) {
+ Callable<Object> task = new Callable<Object>() {
+ public Object call() throws Exception {
+ return requestBodyAndHeaders(endpointUri, body, headers);
+ }
+ };
+
+ return executor.submit(task);
+ }
+
+ public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) {
+ Callable<T> task = new Callable<T>() {
+ public T call() throws Exception {
+ return requestBodyAndHeaders(endpointUri, body, headers, type);
+ }
+ };
+
+ return executor.submit(task);
+ }
+
+
+ public <T> T asyncExtractBody(Future future, Class<T> type) {
+ try {
+ return doExtractBody(future.get(), type);
+ } catch (InterruptedException e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ } catch (ExecutionException e) {
+ // execution failed due to an exception so rethrow the cause
+ throw ObjectHelper.wrapRuntimeCamelException(e.getCause());
+ }
+ }
+
+ public <T> T asyncExtractBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
+ try {
+ if (timeout > 0) {
+ return doExtractBody(future.get(timeout, unit), type);
+ } else {
+ return doExtractBody(future.get(), type);
+ }
+ } catch (InterruptedException e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ } catch (ExecutionException e) {
+ // execution failed due to an exception so rethrow the cause
+ throw ObjectHelper.wrapRuntimeCamelException(e.getCause());
+ }
+ }
+
+ private <T> T doExtractBody(Object result, Class<T> type) {
+ if (result instanceof Exchange) {
+ Exchange exchange = (Exchange) result;
+ Object answer = ExchangeHelper.extractResultBody(exchange, exchange.getPattern());
+ return context.getTypeConverter().convertTo(type, answer);
+ }
+ return context.getTypeConverter().convertTo(type, result);
+ }
+
}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Wed May 6 07:08:14 2009
@@ -19,21 +19,17 @@
import java.util.HashMap;
import java.util.Map;
-import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
-
/**
* Cache containing created {@link Producer}.
*
@@ -91,26 +87,6 @@
}
}
- /**
- * Sends an exchange to an endpoint using a supplied
- * {@link Processor} to populate the exchange. The callback
- * will be called when the exchange is completed.
- *
- * @param endpoint the endpoint to send the exchange to
- * @param processor the transformer used to populate the new exchange
- */
- public Exchange send(Endpoint endpoint, Processor processor, AsyncCallback callback) {
- try {
- Producer producer = getProducer(endpoint);
- Exchange exchange = producer.createExchange();
- boolean sync = sendExchange(endpoint, producer, processor, exchange, callback);
- setProcessedSync(exchange, sync);
- return exchange;
- } catch (Exception e) {
- throw wrapRuntimeCamelException(e);
- }
- }
-
public static boolean isProcessedSync(Exchange exchange) {
Boolean rc = exchange.getProperty(Exchange.PROCESSED_SYNC, Boolean.class);
return rc == null ? false : rc;
@@ -152,17 +128,6 @@
return exchange;
}
- protected boolean sendExchange(Endpoint endpoint, Producer producer, Processor processor, Exchange exchange, AsyncCallback callback) throws Exception {
- // lets populate using the processor callback
- processor.process(exchange);
-
- // now lets dispatch
- if (LOG.isDebugEnabled()) {
- LOG.debug(">>>> " + endpoint + " " + exchange);
- }
- return AsyncProcessorTypeConverter.convert(producer).process(exchange, callback);
- }
-
protected void doStop() throws Exception {
ServiceHelper.stopServices(producers.values());
producers.clear();
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java Wed May 6 07:08:14 2009
@@ -68,7 +68,6 @@
addFallbackTypeConverter(new EnumTypeConverter());
addFallbackTypeConverter(new ArrayTypeConverter());
addFallbackTypeConverter(new PropertyEditorTypeConverter());
- addFallbackTypeConverter(new AsyncProcessorTypeConverter());
}
public List<TypeConverterLoader> getTypeConverterLoaders() {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Wed May 6 07:08:14 2009
@@ -688,34 +688,6 @@
}
/**
- * Causes subsequent processors to be called asynchronously
- *
- * @param coreSize the number of threads that will be used to process
- * messages in subsequent processors.
- * @return a ThreadType builder that can be used to further configure the
- * the thread pool.
- */
- public ThreadDefinition thread(int coreSize) {
- ThreadDefinition answer = new ThreadDefinition(coreSize);
- addOutput(answer);
- return answer;
- }
-
- /**
- * Causes subsequent processors to be called asynchronously
- *
- * @param executor the executor that will be used to process
- * messages in subsequent processors.
- * @return a ThreadType builder that can be used to further configure the
- * the thread pool.
- */
- public ProcessorDefinition<Type> thread(Executor executor) {
- ThreadDefinition answer = new ThreadDefinition(executor);
- addOutput(answer);
- return this;
- }
-
- /**
* <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
* Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
* to avoid duplicate messages
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed May 6 07:08:14 2009
@@ -50,7 +50,7 @@
*/
public class MulticastProcessor extends ServiceSupport implements Processor, Navigate {
- // TODO: Cleanup this when AsyncProcessor/AsyncCallback is replaced with new async API
+ // TODO: Use JDK CompletionService to get rid of the AsyncProcessor/AsyncCallback
static class ProcessorExchangePair {
private final Processor processor;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Wed May 6 07:08:14 2009
@@ -20,13 +20,9 @@
import java.util.Iterator;
import java.util.List;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
-import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,11 +33,9 @@
*
* @version $Revision$
*/
-public class Pipeline extends MulticastProcessor implements AsyncProcessor {
+public class Pipeline extends MulticastProcessor implements Processor {
private static final transient Log LOG = LogFactory.getLog(Pipeline.class);
- // TODO: Cleanup this when AsyncProcessor/AsyncCallback is replaced with new async API
-
public Pipeline(Collection<Processor> processors) {
super(processors);
}
@@ -56,18 +50,37 @@
}
public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
- public boolean process(Exchange original, AsyncCallback callback) {
Iterator<Processor> processors = getProcessors().iterator();
- Exchange nextExchange = original;
+ Exchange nextExchange = exchange;
boolean first = true;
- while (true) {
+
+ while (continueRouting(processors, nextExchange)) {
+ if (first) {
+ first = false;
+ } else {
+ // prepare for next run
+ nextExchange = createNextExchange(nextExchange);
+ }
+
+ // get the next processor
+ Processor processor = processors.next();
+
+ // process the next exchange
+ try {
+ if (LOG.isTraceEnabled()) {
+ // this does the actual processing so log at trace level
+ LOG.trace("Processing exchangeId: " + nextExchange.getExchangeId() + " >>> " + nextExchange);
+ }
+ processor.process(nextExchange);
+ } catch (Exception e) {
+ nextExchange.setException(e);
+ }
+
+ // check for error if so we should break out
boolean exceptionHandled = hasExceptionBeenHandled(nextExchange);
if (nextExchange.isFailed() || exceptionHandled) {
// The Exchange.EXCEPTION_HANDLED property is only set if satisfactory handling was done
- // by the error handler. It's still an exception, the exchange still failed.
+ // by the error handler. It's still an exception, the exchange still failed.
if (LOG.isDebugEnabled()) {
LOG.debug("Message exchange has failed so breaking out of pipeline: " + nextExchange
+ " exception: " + nextExchange.getException() + " fault: "
@@ -76,86 +89,19 @@
}
break;
}
-
- // should we continue routing or not
- if (!continueRouting(processors, nextExchange)) {
- break;
- }
-
- AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
-
- if (first) {
- first = false;
- } else {
- nextExchange = createNextExchange(processor, nextExchange);
- }
-
- boolean sync = process(original, nextExchange, callback, processors, processor);
- // Continue processing the pipeline synchronously ...
- if (!sync) {
- // The pipeline will be completed async...
- return false;
- }
}
- // If we get here then the pipeline was processed entirely
- // synchronously.
if (LOG.isTraceEnabled()) {
// logging nextExchange as it contains the exchange that might have altered the payload and since
// we are logging the completion if will be confusing if we log the original instead
// we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
- LOG.trace("Processing compelete for exchangeId: " + original.getExchangeId() + " >>> " + nextExchange);
- }
- ExchangeHelper.copyResults(original, nextExchange);
- callback.done(true);
- return true;
- }
-
- private boolean process(final Exchange original, final Exchange exchange, final AsyncCallback callback, final Iterator<Processor> processors, AsyncProcessor processor) {
- if (LOG.isTraceEnabled()) {
- // this does the actual processing so log at trace level
- LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " >>> " + exchange);
+ LOG.trace("Processing compelete for exchangeId: " + exchange.getExchangeId() + " >>> " + nextExchange);
}
- return processor.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- // We only have to handle async completion of the pipeline..
- if (sync) {
- return;
- }
-
- // Continue processing the pipeline...
- Exchange nextExchange = exchange;
- while (continueRouting(processors, nextExchange)) {
- AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
-
- boolean exceptionHandled = hasExceptionBeenHandled(nextExchange);
- if (nextExchange.isFailed() || exceptionHandled) {
- // The Exchange.EXCEPTION_HANDLED property is only set if satisfactory handling was done
- // by the error handler. It's still an exception, the exchange still failed.
- if (LOG.isDebugEnabled()) {
- LOG.debug("Message exchange has failed so breaking out of pipeline: " + nextExchange
- + " exception: " + nextExchange.getException() + " fault: "
- + (nextExchange.hasFault() ? nextExchange.getFault() : null)
- + (exceptionHandled ? " handled by the error handler" : ""));
- }
- break;
- }
-
- nextExchange = createNextExchange(processor, nextExchange);
- sync = process(original, nextExchange, callback, processors, processor);
- if (!sync) {
- return;
- }
- }
-
- ExchangeHelper.copyResults(original, nextExchange);
- callback.done(false);
- }
- });
+ // copy results back to the original exchange
+ ExchangeHelper.copyResults(exchange, nextExchange);
}
-
private static boolean hasExceptionBeenHandled(Exchange nextExchange) {
return Boolean.TRUE.equals(nextExchange.getProperty(Exchange.EXCEPTION_HANDLED));
}
@@ -165,11 +111,10 @@
* <p/>
* Remember to copy the original exchange id otherwise correlation of ids in the log is a problem
*
- * @param producer the producer used to send to the endpoint
* @param previousExchange the previous exchange
* @return a new exchange
*/
- protected Exchange createNextExchange(Processor producer, Exchange previousExchange) {
+ protected Exchange createNextExchange(Exchange previousExchange) {
Exchange answer = previousExchange.newInstance();
// we must use the same id as this is a snapshot strategy where Camel copies a snapshot
// before processing the next step in the pipeline, so we have a snapshot of the exchange
@@ -212,4 +157,5 @@
public String toString() {
return "Pipeline" + getProcessors();
}
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java Wed May 6 07:08:14 2009
@@ -16,11 +16,12 @@
*/
package org.apache.camel.util;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
/**
* Helper methods for AsyncProcessor objects.
@@ -32,21 +33,21 @@
}
/**
- * Calls the async version of the processor's process method and waits
- * for it to complete before returning. This can be used by AsyncProcessor
- * objects to implement their sync version of the process method.
+ * Processes the exchange async.
+ *
+ * @param executor executor service
+ * @param processor the processor
+ * @param exchange the exchange
+ * @return a future handle for the task being executed asynchronously
*/
- public static void process(AsyncProcessor processor, Exchange exchange) throws Exception {
- final CountDownLatch latch = new CountDownLatch(1);
- boolean sync = processor.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- if (!sync) {
- latch.countDown();
- }
+ public static Future<Exchange> asyncProcess(final ExecutorService executor, final Processor processor, final Exchange exchange) {
+ Callable<Exchange> task = new Callable<Exchange>() {
+ public Exchange call() throws Exception {
+ processor.process(exchange);
+ return exchange;
}
- });
- if (!sync) {
- latch.await();
- }
+ };
+
+ return executor.submit(task);
}
}
Modified: camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index (original)
+++ camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index Wed May 6 07:08:14 2009
@@ -60,7 +60,6 @@
SortDefinition
SplitDefinition
StopDefinition
-ThreadDefinition
ThrottleDefinition
ThrowFaultDefinition
ToDefinition
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java Wed May 6 07:08:14 2009
@@ -26,7 +26,6 @@
import org.apache.camel.builder.ValueBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.DefaultExchange;
-import org.apache.camel.processor.DelegateAsyncProcessor;
import org.apache.camel.processor.DelegateProcessor;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.PredicateAssertHelper;
@@ -351,9 +350,7 @@
*/
protected Processor unwrap(Processor processor) {
while (true) {
- if (processor instanceof DelegateAsyncProcessor) {
- processor = ((DelegateAsyncProcessor)processor).getProcessor();
- } else if (processor instanceof DelegateProcessor) {
+ if (processor instanceof DelegateProcessor) {
processor = ((DelegateProcessor)processor).getProcessor();
} else {
return processor;
@@ -371,9 +368,6 @@
while (true) {
if (processor instanceof Channel) {
return (Channel) processor;
- }
- if (processor instanceof DelegateAsyncProcessor) {
- processor = ((DelegateAsyncProcessor)processor).getProcessor();
} else if (processor instanceof DelegateProcessor) {
processor = ((DelegateProcessor)processor).getProcessor();
} else {
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProducerTest.java (from r771551, camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProducerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProducerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java&r1=771551&r2=772076&rev=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProducerTest.java Wed May 6 07:08:14 2009
@@ -16,56 +16,31 @@
*/
package org.apache.camel.component.seda;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
-import org.apache.camel.Producer;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
/**
- * Async processor with seda to simulate the caller thread is not blocking while the
- * exchange is processed and we get callbacks when the exchange is complete.
+ * The new Async API version of doing async routing based on the old AsyncProcessor API
+ * In the old SedaAsyncProcessorTest a seda endpoint was needed to really turn it into async. This is not
+ * needed by the new API so we send it using direct instead.
*
* @version $Revision$
*/
-public class SedaAsyncProcessorTest extends ContextTestSupport {
+public class SedaAsyncProducerTest extends ContextTestSupport {
- private CountDownLatch latchAsync = new CountDownLatch(1);
- private boolean doneSync;
private String route = "";
- public void testAsyncWithSeda() throws Exception {
+ public void testAsyncProducer() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(1);
- Endpoint endpoint = context.getEndpoint("seda:start");
- Producer producer = endpoint.createProducer();
-
- Exchange exchange = producer.createExchange(ExchangePattern.InOut);
- exchange.getIn().setBody("Hello World");
-
- // seda producer is async also (but this is ugly to need to cast)
- AsyncProcessor async = (AsyncProcessor) producer;
-
- boolean sync = async.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- // we expect 2 callbacks
- // the first is when we have finished sending to the seda producer
- if (sync) {
- doneSync = true;
- } else {
- latchAsync.countDown();
- }
- }
- });
+ // using the new async API we can fire a real async message
+ Future<String> future = template.asyncRequestBody("direct:start", "Hello World", String.class);
// I should happen before mock
route = route + "send";
@@ -73,16 +48,10 @@
assertMockEndpointsSatisfied();
assertEquals("Send should occur before processor", "sendprocess", route);
- assertTrue("Sync done should have occured", doneSync);
- // wait at most 2 seconds
- boolean zero = latchAsync.await(2, TimeUnit.SECONDS);
- assertTrue("Async done should have occured", zero);
-
- // how to get the response?
- String response = exchange.getOut().getBody(String.class);
- // TODO: we need a new API for getting the result
- //assertEquals("Bye World", response);
+ // and get the response with the future handle
+ String response = future.get();
+ assertEquals("Bye World", response);
}
@Override
@@ -92,7 +61,7 @@
public void configure() throws Exception {
errorHandler(noErrorHandler());
- from("seda:start").delay(100)
+ from("direct:start").delay(100)
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
route = route + "process";
@@ -105,4 +74,4 @@
}
};
}
-}
+}
\ No newline at end of file
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java?rev=772076&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java Wed May 6 07:08:14 2009
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.AsyncProcessorHelper;
+
+/**
+ * @version $Revision$
+ */
+public class DefaultProducerTemplateAsyncTest extends ContextTestSupport {
+
+ public void testRequestAsync() throws Exception {
+ Exchange exchange = new DefaultExchange(context);
+ exchange.getIn().setBody("Hello");
+
+ Future<Exchange> future = template.asyncSend("direct:start", exchange);
+ long start = System.currentTimeMillis();
+
+ // you can do other stuff
+ String echo = template.requestBody("direct:echo", "Hi", String.class);
+ assertEquals("HiHi", echo);
+
+ Exchange result = future.get();
+
+ long delta = System.currentTimeMillis() - start;
+ assertTrue("Should take longer than: " + delta, delta > 250);
+ assertEquals("Hello World", result.getOut().getBody());
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testSendAsyncProcessor() throws Exception {
+ Future<Exchange> future = template.asyncSend("direct:start", new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setBody("Hello");
+ }
+ });
+ long start = System.currentTimeMillis();
+
+ // you can do other stuff
+ String echo = template.requestBody("direct:echo", "Hi", String.class);
+ assertEquals("HiHi", echo);
+
+ Exchange result = future.get();
+
+ long delta = System.currentTimeMillis() - start;
+ assertTrue("Should take longer than: " + delta, delta > 250);
+ assertEquals("Hello World", result.getOut().getBody());
+ }
+
+ public void testRequestAsyncBody() throws Exception {
+ Future<Object> future = template.asyncRequestBody("direct:start", "Hello");
+ long start = System.currentTimeMillis();
+
+ // you can do other stuff
+ String echo = template.requestBody("direct:echo", "Hi", String.class);
+ assertEquals("HiHi", echo);
+
+ // we can use extract body to convert to expect body type
+ String result = template.asyncExtractBody(future, String.class);
+
+ long delta = System.currentTimeMillis() - start;
+ assertTrue("Should take longer than: " + delta, delta > 250);
+ assertEquals("Hello World", result);
+ }
+
+ public void testRequestAsyncBodyType() throws Exception {
+ Future<String> future = template.asyncRequestBody("direct:start", "Hello", String.class);
+ long start = System.currentTimeMillis();
+
+ // you can do other stuff
+ String echo = template.requestBody("direct:echo", "Hi", String.class);
+ assertEquals("HiHi", echo);
+
+ // or we can use parameter type in the requestBody method so the future handle know its type
+ String result = future.get();
+
+ long delta = System.currentTimeMillis() - start;
+ assertTrue("Should take longer than: " + delta, delta > 250);
+ assertEquals("Hello World", result);
+ }
+
+ public void testRequestAsyncBodyAndHeader() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+ mock.expectedHeaderReceived("foo", 123);
+
+ Future<Object> future = template.asyncRequestBodyAndHeader("direct:start", "Hello", "foo", 123);
+ long start = System.currentTimeMillis();
+
+ // you can do other stuff
+ String echo = template.requestBody("direct:echo", "Hi", String.class);
+ assertEquals("HiHi", echo);
+
+ // we can use extract body to convert to expect body type
+ String result = template.asyncExtractBody(future, String.class);
+
+ assertMockEndpointsSatisfied();
+
+ long delta = System.currentTimeMillis() - start;
+ assertTrue("Should take longer than: " + delta, delta > 250);
+ assertEquals("Hello World", result);
+ }
+
+ public void testRequestAsyncBodyAndHeaderType() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+ mock.expectedHeaderReceived("foo", 123);
+
+ Future<String> future = template.asyncRequestBodyAndHeader("direct:start", "Hello", "foo", 123, String.class);
+ long start = System.currentTimeMillis();
+
+ // you can do other stuff
+ String echo = template.requestBody("direct:echo", "Hi", String.class);
+ assertEquals("HiHi", echo);
+
+ // or we can use parameter type in the requestBody method so the future handle know its type
+ String result = future.get();
+
+ assertMockEndpointsSatisfied();
+
+ long delta = System.currentTimeMillis() - start;
+ assertTrue("Should take longer than: " + delta, delta > 250);
+ assertEquals("Hello World", result);
+ }
+
+ public void testRequestAsyncBodyAndHeaders() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+ mock.expectedHeaderReceived("foo", 123);
+ mock.expectedHeaderReceived("bar", "cheese");
+
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put("foo", 123);
+ headers.put("bar", "cheese");
+ Future<Object> future = template.asyncRequestBodyAndHeaders("direct:start", "Hello", headers);
+ long start = System.currentTimeMillis();
+
+ // you can do other stuff
+ String echo = template.requestBody("direct:echo", "Hi", String.class);
+ assertEquals("HiHi", echo);
+
+ // we can use extract body to convert to expect body type
+ String result = template.asyncExtractBody(future, String.class);
+
+ assertMockEndpointsSatisfied();
+
+ long delta = System.currentTimeMillis() - start;
+ assertTrue("Should take longer than: " + delta, delta > 250);
+ assertEquals("Hello World", result);
+ }
+
+ public void testRequestAsyncBodyAndHeadersType() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+ mock.expectedHeaderReceived("foo", 123);
+ mock.expectedHeaderReceived("bar", "cheese");
+
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put("foo", 123);
+ headers.put("bar", "cheese");
+ Future<String> future = template.asyncRequestBodyAndHeaders("direct:start", "Hello", headers, String.class);
+ long start = System.currentTimeMillis();
+
+ // you can do other stuff
+ String echo = template.requestBody("direct:echo", "Hi", String.class);
+ assertEquals("HiHi", echo);
+
+ // or we can use parameter type in the requestBody method so the future handle know its type
+ String result = future.get();
+
+ assertMockEndpointsSatisfied();
+
+ long delta = System.currentTimeMillis() - start;
+ assertTrue("Should take longer than: " + delta, delta > 250);
+ assertEquals("Hello World", result);
+ }
+
+ public void testRequestAsyncErrorWhenProcessing() throws Exception {
+ Exchange exchange = new DefaultExchange(context);
+ exchange.getIn().setBody("Hello");
+
+ Future<Object> future = template.asyncRequestBody("direct:error", exchange);
+ long start = System.currentTimeMillis();
+
+ // you can do other stuff
+ String echo = template.requestBody("direct:echo", "Hi", String.class);
+ assertEquals("HiHi", echo);
+
+ try {
+ Exchange result = template.asyncExtractBody(future, Exchange.class);
+ fail("Should have thrown exception");
+ } catch (RuntimeCamelException e) {
+ assertEquals("Damn forced by unit test", e.getCause().getMessage());
+ }
+
+ long delta = System.currentTimeMillis() - start;
+ assertTrue("Should take longer than: " + delta, delta > 250);
+ }
+
+ public void testRequestAsyncBodyErrorWhenProcessing() throws Exception {
+ Future<Object> future = template.asyncRequestBody("direct:error", "Hello");
+ long start = System.currentTimeMillis();
+
+ // you can do other stuff
+ String echo = template.requestBody("direct:echo", "Hi", String.class);
+ assertEquals("HiHi", echo);
+
+ try {
+ String result = template.asyncExtractBody(future, String.class);
+ fail("Should have thrown exception");
+ } catch (RuntimeCamelException e) {
+ assertEquals("Damn forced by unit test", e.getCause().getMessage());
+ }
+
+ long delta = System.currentTimeMillis() - start;
+ assertTrue("Should take longer than: " + delta, delta > 250);
+ }
+
+ public void testAsyncProcessWithClassicAPI() throws Exception {
+ Endpoint endpoint = context.getEndpoint("direct:start");
+ Exchange exchange = endpoint.createExchange();
+ exchange.getIn().setBody("Hello");
+
+ long start = System.currentTimeMillis();
+
+ // produce it async so we use a helper
+ Producer producer = endpoint.createProducer();
+ // normally you will use a shared exectutor service with pools
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ // send it async with the help of this helper
+ Future<Exchange> future = AsyncProcessorHelper.asyncProcess(executor, producer, exchange);
+
+ // you can do other stuff
+ String echo = template.requestBody("direct:echo", "Hi", String.class);
+ assertEquals("HiHi", echo);
+
+ String result = template.asyncExtractBody(future, String.class);
+
+ assertMockEndpointsSatisfied();
+
+ long delta = System.currentTimeMillis() - start;
+ assertTrue("Should take longer than: " + delta, delta > 250);
+ assertEquals("Hello World", result);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .delay(250)
+ .transform(body().append(" World")).to("mock:result");
+
+ from("direct:error")
+ .delay(250)
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ throw new IllegalArgumentException("Damn forced by unit test");
+ }
+ });
+
+ from("direct:echo").transform(body().append(body()));
+ }
+ };
+ }
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java Wed May 6 07:08:14 2009
@@ -16,8 +16,6 @@
*/
package org.apache.camel.processor;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -78,7 +76,7 @@
}
protected RouteBuilder createRouteBuilder() {
- final Processor processor = new AsyncProcessor() {
+ final Processor processor = new Processor() {
public void process(Exchange exchange) {
Integer counter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
int attempt = (counter == null) ? 1 : counter + 1;
@@ -87,24 +85,6 @@
+ " being less than: " + failUntilAttempt);
}
}
- // START SNIPPET: AsyncProcessor
- public boolean process(Exchange exchange, AsyncCallback callback) {
- Integer counter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
- int attempt = (counter == null) ? 1 : counter + 1;
- if (attempt > 1) {
- assertEquals("Now we should use TimerThread to call the process", Thread.currentThread().getName(),
- "Camel DeadLetterChannel Redeliver Timer");
- }
-
- if (attempt < failUntilAttempt) {
- // we can't throw the exception here , or the callback will not be invoked.
- exchange.setException(new RuntimeException("Failed to process due to attempt: " + attempt
- + " being less than: " + failUntilAttempt));
- }
- callback.done(false);
- return false;
- }
- // END SNIPPET: AsyncProcessor
};
return new RouteBuilder() {
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineConcurrentTest.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineConcurrentTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineConcurrentTest.java Wed May 6 07:08:14 2009
@@ -30,6 +30,8 @@
*/
public class PipelineConcurrentTest extends ContextTestSupport {
+ private String uri = "seda:in?size=10000&concurrentConsumers=10";
+
public void testConcurrentPipeline() throws Exception {
int total = 10000;
final int group = total / 20;
@@ -51,7 +53,7 @@
} catch (InterruptedException e) {
// ignore
}
- template.sendBody("seda:in?size=10000", "" + (start + i));
+ template.sendBody(uri, "" + (start + i));
}
}
});
@@ -67,8 +69,7 @@
// to force any exceptions coming forward immediately
errorHandler(noErrorHandler());
- from("seda:in?size=10000")
- .thread(10)
+ from(uri)
.pipeline("direct:do", "mock:result");
from("direct:do")
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java Wed May 6 07:08:14 2009
@@ -94,7 +94,7 @@
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- from("direct:async").thread(1).to("direct:foo");
+ from("seda:async").to("direct:foo");
from("direct:foo").process(new Processor() {
public void process(Exchange exchange) throws Exception {
log.info("Received: " + exchange);
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java Wed May 6 07:08:14 2009
@@ -22,7 +22,7 @@
public class UnitOfWorkWithAsyncFlowTest extends UnitOfWorkTest {
@Override
protected void setUp() throws Exception {
- uri = "direct:async";
+ uri = "seda:async";
super.setUp();
}
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java Wed May 6 07:08:14 2009
@@ -42,8 +42,6 @@
@Override
public void configure() throws Exception {
// TODO: Should also work with DLC
- // will be possible when we remove the AsyncProcessor so the processing logic
- // is much easier to deal with
// errorHandler(deadLetterChannel("mock:dead").disableRedelivery());
onException(MyTechnicalException.class)
Modified: camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatDataSetTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatDataSetTest.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatDataSetTest.java (original)
+++ camel/trunk/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatDataSetTest.java Wed May 6 07:08:14 2009
@@ -55,7 +55,7 @@
DataFormat jaxb = new JaxbDataFormat("org.apache.camel.example");
// use 5 concurrent threads to do marshalling
- from("dataset:beer").thread(5).marshal(jaxb).to("dataset:beer");
+ from("dataset:beer").marshal(jaxb).to("dataset:beer");
}
};
}
Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java (original)
+++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java Wed May 6 07:08:14 2009
@@ -22,7 +22,6 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import org.apache.camel.AsyncCallback;
import org.apache.camel.component.http.CamelServlet;
import org.apache.camel.component.http.HttpConsumer;
import org.apache.camel.component.http.HttpExchange;
@@ -56,30 +55,30 @@
if (continuation.isNew()) {
// Have the camel process the HTTP exchange.
- final HttpExchange exchange = new HttpExchange(consumer.getEndpoint(), request, response);
- boolean sync = consumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- if (sync) {
- return;
- }
- continuation.setObject(exchange);
- continuation.resume();
- }
- });
+ // final HttpExchange exchange = new HttpExchange(consumer.getEndpoint(), request, response);
+ // boolean sync = consumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
+ // public void done(boolean sync) {
+ // if (sync) {
+ // return;
+ // }
+ // continuation.setObject(exchange);
+ // continuation.resume();
+ // }
+ //});
- if (!sync) {
+ //if (!sync) {
// Wait for the exchange to get processed.
// This might block until it completes or it might return via an exception and
// then this method is re-invoked once the the exchange has finished processing
- continuation.suspend(0);
- }
+ // continuation.suspend(0);
+ //}
// HC: The getBinding() is interesting because it illustrates the
// impedance miss-match between HTTP's stream oriented protocol, and
// Camels more message oriented protocol exchanges.
// now lets output to the response
- consumer.getBinding().writeResponse(exchange, response);
+ //consumer.getBinding().writeResponse(exchange, response);
return;
}
Modified: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/MultiThreadedHttpGetTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/MultiThreadedHttpGetTest.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/MultiThreadedHttpGetTest.java (original)
+++ camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/MultiThreadedHttpGetTest.java Wed May 6 07:08:14 2009
@@ -37,7 +37,7 @@
// so the stream has to be read to the end. When this happens
// the associated connection is released automatically.
- String endpointName = "seda:withConversion";
+ String endpointName = "seda:withConversion?concurrentConsumers=5";
sendMessagesTo(endpointName, 5);
}
@@ -50,7 +50,7 @@
context.getComponent("http", HttpComponent.class).getHttpConnectionManager().getParams()
.setDefaultMaxConnectionsPerHost(5);
- String endpointName = "seda:withoutConversion";
+ String endpointName = "seda:withoutConversion?concurrentConsumers=5";
sendMessagesTo(endpointName, 5);
}
@@ -63,7 +63,7 @@
for (int i = 0; i < 5; i++) {
mockEndpoint.expectedMessageCount(1);
- template.sendBody("seda:withoutConversion", null);
+ template.sendBody("seda:withoutConversion?concurrentConsumers=5", null);
mockEndpoint.assertIsSatisfied();
Object response = mockEndpoint.getReceivedExchanges().get(0).getIn().getBody();
InputStream responseStream = assertIsInstanceOf(InputStream.class, response);
@@ -95,10 +95,10 @@
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() {
- from("seda:withConversion").thread(5).to("http://localhost:5430/search")
+ from("seda:withConversion?concurrentConsumers=5").to("http://localhost:5430/search")
.convertBodyTo(String.class).to("mock:results");
- from("seda:withoutConversion").thread(5).to("http://localhost:5430/search")
+ from("seda:withoutConversion?concurrentConsumers=5").to("http://localhost:5430/search")
.to("mock:results");
from("jetty:http://localhost:5430/search").process(new Processor() {
Modified: camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java (original)
+++ camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java Wed May 6 07:08:14 2009
@@ -19,7 +19,6 @@
import java.util.Random;
import org.apache.camel.ContextTestSupport;
-import org.apache.camel.builder.DeadLetterChannelBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -29,6 +28,8 @@
*/
public class XQueryConcurrencyTest extends ContextTestSupport {
+ private String uri = "seda:in?concurrentConsumers=5";
+
public void testConcurrency() throws Exception {
int total = 1000;
@@ -51,7 +52,7 @@
} catch (InterruptedException e) {
// ignore
}
- template.sendBody("seda:in", "<person><id>" + (start + i) + "</id><name>James</name></person>");
+ template.sendBody(uri, "<person><id>" + (start + i) + "</id><name>James</name></person>");
}
}
});
@@ -67,8 +68,7 @@
// no retry as we want every failure to submerge
errorHandler(noErrorHandler());
- from("seda:in")
- .thread(5)
+ from(uri)
.transform().xquery("/person/id", String.class)
.to("mock:result");
}
Modified: camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java (original)
+++ camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryURLBasedConcurrencyTest.java Wed May 6 07:08:14 2009
@@ -71,8 +71,9 @@
// no retry as we want every failure to submerge
errorHandler(noErrorHandler());
- from("direct:start")
- .thread(5)
+ from("direct:start").to("seda:foo?concurrentConsumers=5");
+
+ from("seda:foo?concurrentConsumers=5")
.to("xquery:org/apache/camel/component/xquery/transform.xquery")
.to("mock:result");
}
Modified: camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala?rev=772076&r1=772075&r2=772076&view=diff
==============================================================================
--- camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala (original)
+++ camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala Wed May 6 07:08:14 2009
@@ -44,7 +44,6 @@
def setbody(expression: Exchange => Any) : DSL
def setheader(header: String, expression: Exchange => Any) : DSL
def split(expression: Exchange => Any) : SSplitDefinition
- def thread(number: Int) : SThreadDefinition
def throttle(frequency: Frequency) : SThrottleDefinition
def to(uris: String*) : DSL
def unmarshal(format: DataFormatDefinition) : DSL