You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/07 08:31:43 UTC

svn commit: r931444 [1/2] - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/component/bean/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/management/mbean...

Author: davsclaus
Date: Wed Apr  7 06:31:41 2010
New Revision: 931444

URL: http://svn.apache.org/viewvc?rev=931444&view=rev
Log:
CAMEL-2558: Producer and ConsumerTemplate can now have its max cache size configured. Also CamelContext now returns already started templates in its create template methods.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateWithCustomCacheMaxSizeTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateWithCustomCacheMaxSizeTest.java   (with props)
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/ConsumerTemplateMaximumCacheSizeTest.java   (with props)
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/ProducerTemplateMaximumCacheSizeTest.java
      - copied, changed from r931425, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/ProducerTemplateAutoRegisterTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/config/ConsumerTemplateMaximumCacheSizeTest-context.xml   (with props)
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/config/ProducerTemplateMaximumCacheSizeTest-context.xml
      - copied, changed from r931425, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/config/ProducerTemplateAutoRegisterTest-context.xml
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerServicePool.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ServicePool.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/CamelContextHelper.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomProducerServicePoolTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerCacheTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MainSupportTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ServicePoolTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SamplingThrottlerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitRouteNumberOfProcessorTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionProcessorInspectCausedExceptionTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionProcessorInspectCausedExceptionWithDefaultErrorHandlerTest.java
    camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
    camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java
    camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfProducerTest.java
    camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/transport/CamelJBIClientProxyTest.java
    camel/trunk/components/camel-guice/src/main/java/org/apache/camel/guice/Main.java
    camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcEndpointTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutWithSpringRestartIssueTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsRedeliveryWithInitialRedeliveryDelayTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/TransactionErrorHandlerRedeliveryDelayTest.java
    camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java
    camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java
    camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapter.java
    camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelConsumerTemplateFactoryBean.java
    camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelProducerTemplateFactoryBean.java
    camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/Main.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/CustomProcessorWithNamespacesTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/RoutingUsingCamelContextFactoryTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CamelContextAutoStartupTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/scan/SpringComponentScanTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/scan/SpringComponentScanWithDeprecatedPackagesTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/PojoDualCamelContextConsumerTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/ContainerWideInterceptorTest.java
    camel/trunk/components/camel-velocity/src/test/java/org/apache/camel/component/velocity/VelocityDynamicTemplateTest.java
    camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRouteTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Wed Apr  7 06:31:41 2010
@@ -79,6 +79,8 @@ public interface CamelContext extends Se
 
     /**
      * Adds a service, starting it so that it will be stopped with this context
+     * <p/>
+     * The added service will also be enlisted in JMX for management (if JMX is enabled)
      *
      * @param object the service
      * @throws Exception can be thrown when starting the service
@@ -460,28 +462,62 @@ public interface CamelContext extends Se
     List<String> getLanguageNames();
 
     /**
-     * Creates a new {@link ProducerTemplate} which is <b>not</b> started.
+     * Creates a new {@link ProducerTemplate} which is <b>started</b> and therefore ready to use right away.
+     * <p/>
+     * See this FAQ before use: <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">
+     * Why does Camel use too many threads with ProducerTemplate?</a>
+     * <p/>
+     * Will use cache size defined in Camel property with key {@link Exchange#MAXIMUM_CACHE_POOL_SIZE}.
+     * If no key was defined then it will fallback to a default size of 1000.
+     * You can also use the {@link org.apache.camel.ProducerTemplate#setMaximumCacheSize(int)} method to use a custom value
+     * before starting the template.
+     *
+     * @return the template
+     * @throws Exception is thrown if error starting the template
+     */
+    ProducerTemplate createProducerTemplate() throws Exception;
+
+    /**
+     * Creates a new {@link ProducerTemplate} which is <b>started</b> and therefore ready to use right away.
      * <p/>
      * You <b>must</b> start the template before its being used.
      * <p/>
      * See this FAQ before use: <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">
      * Why does Camel use too many threads with ProducerTemplate?</a>
      *
+     * @param maximumCacheSize the maximum cache size
      * @return the template
+     * @throws Exception is thrown if error starting the template
      */
-    ProducerTemplate createProducerTemplate();
+    ProducerTemplate createProducerTemplate(int maximumCacheSize) throws Exception;
 
     /**
-     * Creates a new {@link ConsumerTemplate} which is <b>not</b> started.
+     * Creates a new {@link ConsumerTemplate} which is <b>started</b> and therefore ready to use right away.
      * <p/>
-     * You <b>must</b> start the template before its being used.
+     * See this FAQ before use: <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">
+     * Why does Camel use too many threads with ProducerTemplate?</a> as it also applies for ConsumerTemplate.
+     * <p/>
+     * Will use cache size defined in Camel property with key {@link Exchange#MAXIMUM_CACHE_POOL_SIZE}.
+     * If no key was defined then it will fallback to a default size of 1000.
+     * You can also use the {@link org.apache.camel.ConsumerTemplate#setMaximumCacheSize(int)} method to use a custom value
+     * before starting the template.
+     *
+     * @return the template
+     * @throws Exception is thrown if error starting the template
+     */
+    ConsumerTemplate createConsumerTemplate() throws Exception;
+
+    /**
+     * Creates a new {@link ConsumerTemplate} which is <b>started</b> and therefore ready to use right away.
      * <p/>
      * See this FAQ before use: <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">
      * Why does Camel use too many threads with ProducerTemplate?</a> as it also applies for ConsumerTemplate.
      *
+     * @param maximumCacheSize the maximum cache size
      * @return the template
+     * @throws Exception is thrown if error starting the template
      */
-    ConsumerTemplate createConsumerTemplate();
+    ConsumerTemplate createConsumerTemplate(int maximumCacheSize) throws Exception;
 
     /**
      * Adds the given interceptor strategy

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java Wed Apr  7 06:31:41 2010
@@ -46,6 +46,33 @@ package org.apache.camel;
  */
 public interface ConsumerTemplate extends Service {
 
+    // Configuration methods
+    // -----------------------------------------------------------------------
+
+    /**
+     * Gets the maximum cache size used.
+     *
+     * @return the maximum cache size
+     */
+    int getMaximumCacheSize();
+
+    /**
+     * Sets a custom maximum cache size.
+     *
+     * @param maximumCacheSize the custom maximum cache size
+     */
+    void setMaximumCacheSize(int maximumCacheSize);
+
+    /**
+     * Gets an approximated size of the current cached resources in the backing cache pools.
+     *
+     * @return the size of current cached resources
+     */
+    int getCurrentCacheSize();
+
+    // Synchronous methods
+    // -----------------------------------------------------------------------
+
     /**
      * Receives from the endpoint, waiting until there is a response
      *

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Wed Apr  7 06:31:41 2010
@@ -92,7 +92,8 @@ public interface Exchange {
     String LOOP_INDEX               = "CamelLoopIndex";
     String LOOP_SIZE                = "CamelLoopSize";
 
-    String MULTICAST_INDEX = "CamelMulticastIndex";
+    String MAXIMUM_CACHE_POOL_SIZE = "CamelMaximumCachePoolSize";
+    String MULTICAST_INDEX         = "CamelMulticastIndex";
 
     String ON_COMPLETION = "CamelOnCompletion";
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java Wed Apr  7 06:31:41 2010
@@ -30,7 +30,7 @@ import org.apache.camel.spi.Synchronizat
  * {@link Exchange} to an {@link Endpoint}.
  * <p/>
  * <b>All</b> methods throws {@link RuntimeCamelException} if processing of
- * the {@link Exchange} failed and an Exception occured. The <tt>getCause</tt>
+ * the {@link Exchange} failed and an Exception occurred. The <tt>getCause</tt>
  * method on {@link RuntimeCamelException} returns the wrapper original caused
  * exception.
  * <p/>
@@ -50,6 +50,30 @@ import org.apache.camel.spi.Synchronizat
  */
 public interface ProducerTemplate extends Service {
 
+    // Configuration methods
+    // -----------------------------------------------------------------------
+
+    /**
+     * Gets the maximum cache size used in the backing cache pools.
+     *
+     * @return the maximum cache size
+     */
+    int getMaximumCacheSize();
+
+    /**
+     * Sets a custom maximum cache size to use in the backing cache pools.
+     *
+     * @param maximumCacheSize the custom maximum cache size
+     */
+    void setMaximumCacheSize(int maximumCacheSize);
+
+    /**
+     * Gets an approximated size of the current cached resources in the backing cache pools.
+     *
+     * @return the size of current cached resources
+     */
+    int getCurrentCacheSize();
+
     // Synchronous methods
     // -----------------------------------------------------------------------
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java Wed Apr  7 06:31:41 2010
@@ -45,7 +45,7 @@ import java.lang.annotation.Target;
 public @interface RecipientList {
     String context() default "";
     String delimiter() default ",";
-    boolean parallelProcessoing() default false;
+    boolean parallelProcessing() default false;
     boolean stopOnException() default false;
     String strategyRef() default "";
     String executorServiceRef() default "";

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java Wed Apr  7 06:31:41 2010
@@ -36,6 +36,7 @@ import org.apache.camel.processor.Recipi
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -60,6 +61,8 @@ public class MethodInfo {
     private ExchangePattern pattern = ExchangePattern.InOut;
     private RecipientList recipientList;
 
+    // TODO: This class should extends ServiceSupport so we can cleanup recipientList when stopping
+
     public MethodInfo(CamelContext camelContext, Class<?> type, Method method, List<ParameterInfo> parameters, List<ParameterInfo> bodyParameters,
                       boolean hasCustomAnnotation, boolean hasHandlerAnnotation, boolean voidAsInOnly) {
         this.camelContext = camelContext;
@@ -85,16 +88,16 @@ public class MethodInfo {
 
             org.apache.camel.RecipientList annotation = method.getAnnotation(org.apache.camel.RecipientList.class);
 
-            recipientList = new RecipientList(annotation.delimiter());
+            recipientList = new RecipientList(camelContext, annotation.delimiter());
             recipientList.setStopOnException(annotation.stopOnException());
-            recipientList.setParallelProcessing(annotation.parallelProcessoing());
+            recipientList.setParallelProcessing(annotation.parallelProcessing());
 
             if (ObjectHelper.isNotEmpty(annotation.executorServiceRef())) {
                 ExecutorService executor = CamelContextHelper.mandatoryLookup(camelContext, annotation.executorServiceRef(), ExecutorService.class);
                 recipientList.setExecutorService(executor);
             }
 
-            if (annotation.parallelProcessoing() && recipientList.getExecutorService() == null) {
+            if (annotation.parallelProcessing() && recipientList.getExecutorService() == null) {
                 // we are running in parallel so we need a thread pool
                 ExecutorService executor = camelContext.getExecutorServiceStrategy().newDefaultThreadPool(this, "@RecipientList");
                 recipientList.setExecutorService(executor);
@@ -140,6 +143,10 @@ public class MethodInfo {
                 }
                 Object result = invoke(method, pojo, arguments, exchange);
                 if (recipientList != null) {
+                    // ensure its started
+                    if (!recipientList.isStarted()) {
+                        ServiceHelper.startService(recipientList);
+                    }
                     recipientList.sendToRecipientList(exchange, result);
                     // we don't want to return the list of endpoints
                     // return Void to indicate to BeanProcessor that there is no reply

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java Wed Apr  7 06:31:41 2010
@@ -165,14 +165,28 @@ public class CamelPostProcessorHelper im
     protected ProducerTemplate createInjectionProducerTemplate(String endpointUri, String endpointRef, String injectionPointName) {
         // endpoint is optional for this injection point
         Endpoint endpoint = getEndpointInjection(endpointUri, endpointRef, injectionPointName, false);
-        return new DefaultProducerTemplate(getCamelContext(), endpoint);
+        ProducerTemplate answer = new DefaultProducerTemplate(getCamelContext(), endpoint);
+        // start the template so its ready to use
+        try {
+            answer.start();
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+        return answer;
     }
 
     /**
      * Factory method to create a {@link org.apache.camel.ConsumerTemplate} to be injected into a POJO
      */
     protected ConsumerTemplate createInjectionConsumerTemplate(String endpointUri, String endpointRef, String injectionPointName) {
-        return new DefaultConsumerTemplate(getCamelContext());
+        ConsumerTemplate answer = new DefaultConsumerTemplate(getCamelContext());
+        // start the template so its ready to use
+        try {
+            answer.start();
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+        return answer;
     }
 
     /**

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java Wed Apr  7 06:31:41 2010
@@ -18,11 +18,13 @@ package org.apache.camel.impl;
 
 import java.util.Map;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.FailedToCreateConsumerException;
 import org.apache.camel.IsSingleton;
 import org.apache.camel.PollingConsumer;
+import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.LRUCache;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
@@ -35,14 +37,19 @@ import org.apache.commons.logging.LogFac
  */
 public class ConsumerCache extends ServiceSupport {
     private static final transient Log LOG = LogFactory.getLog(ConsumerCache.class);
-
+    private final CamelContext camelContext;
     private final Map<String, PollingConsumer> consumers;
 
-    public ConsumerCache() {
-        this.consumers = new LRUCache<String, PollingConsumer>(1000);
+    public ConsumerCache(CamelContext camelContext) {
+        this(camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext));
+    }
+
+    public ConsumerCache(CamelContext camelContext, int maximumCacheSize) {
+        this(camelContext, new LRUCache<String, PollingConsumer>(maximumCacheSize));
     }
 
-    public ConsumerCache(Map<String, PollingConsumer> cache) {
+    public ConsumerCache(CamelContext camelContext, Map<String, PollingConsumer> cache) {
+        this.camelContext = camelContext;
         this.consumers = cache;
     }
 
@@ -59,7 +66,7 @@ public class ConsumerCache extends Servi
 
             boolean singleton = true;
             if (answer instanceof IsSingleton) {
-                singleton = ((IsSingleton)answer).isSingleton();
+                singleton = ((IsSingleton) answer).isSingleton();
             }
 
             if (singleton) {
@@ -103,12 +110,13 @@ public class ConsumerCache extends Servi
         return consumer.receiveNoWait();
     }
 
-    protected void doStop() throws Exception {
-        ServiceHelper.stopServices(consumers.values());
-        consumers.clear();
+    protected void doStart() throws Exception {
+        ServiceHelper.startServices(consumers);
     }
 
-    protected void doStart() throws Exception {
+    protected void doStop() throws Exception {
+        ServiceHelper.stopServices(consumers);
+        consumers.clear();
     }
 
     /**

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Wed Apr  7 06:31:41 2010
@@ -89,6 +89,7 @@ import org.apache.camel.spi.RouteStartup
 import org.apache.camel.spi.ServicePool;
 import org.apache.camel.spi.ShutdownStrategy;
 import org.apache.camel.spi.TypeConverterRegistry;
+import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.EventHelper;
 import org.apache.camel.util.LRUCache;
@@ -501,7 +502,7 @@ public class DefaultCamelContext extends
     /**
      * Strategy to add the given endpoint to the internal endpoint registry
      *
-     * @param uri  uri of endpoint
+     * @param uri      uri of endpoint
      * @param endpoint the endpoint to add
      * @return the added endpoint
      */
@@ -936,12 +937,30 @@ public class DefaultCamelContext extends
         this.delay = delay;
     }
 
-    public ProducerTemplate createProducerTemplate() {
-        return new DefaultProducerTemplate(this);
+    public ProducerTemplate createProducerTemplate() throws Exception {
+        int size = CamelContextHelper.getMaximumCachePoolSize(this);
+        return createProducerTemplate(size);
     }
 
-    public ConsumerTemplate createConsumerTemplate() {
-        return new DefaultConsumerTemplate(this);
+    public ProducerTemplate createProducerTemplate(int maximumCacheSize) throws Exception {
+        DefaultProducerTemplate answer = new DefaultProducerTemplate(this);
+        answer.setMaximumCacheSize(maximumCacheSize);
+        // start it so its ready to use
+        answer.start();
+        return answer;
+    }
+
+    public ConsumerTemplate createConsumerTemplate() throws Exception {
+        int size = CamelContextHelper.getMaximumCachePoolSize(this);
+        return createConsumerTemplate(size);
+    }
+
+    public ConsumerTemplate createConsumerTemplate(int maximumCacheSize) throws Exception {
+        DefaultConsumerTemplate answer = new DefaultConsumerTemplate(this);
+        answer.setMaximumCacheSize(maximumCacheSize);
+        // start it so its ready to use
+        answer.start();
+        return answer;
     }
 
     public ErrorHandlerBuilder getErrorHandlerBuilder() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java Wed Apr  7 06:31:41 2010
@@ -16,42 +16,50 @@
  */
 package org.apache.camel.impl;
 
-
 import org.apache.camel.CamelContext;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.ServiceHelper;
 
 import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
 
 /**
  * @version $Revision$
  */
-public class DefaultConsumerTemplate implements ConsumerTemplate {
+public class DefaultConsumerTemplate extends ServiceSupport implements ConsumerTemplate {
 
     private final CamelContext context;
-    private final ConsumerCache consumerCache = new ConsumerCache();
+    private ConsumerCache consumerCache;
+    private int maximumCacheSize;
 
     public DefaultConsumerTemplate(CamelContext context) {
         this.context = context;
     }
 
-    public void start() throws Exception {
-        consumerCache.start();
+    public int getMaximumCacheSize() {
+        return maximumCacheSize;
+    }
+
+    public void setMaximumCacheSize(int maximumCacheSize) {
+        this.maximumCacheSize = maximumCacheSize;
     }
 
-    public void stop() throws Exception {
-        consumerCache.stop();
+    public int getCurrentCacheSize() {
+        if (consumerCache == null) {
+            return 0;
+        }
+        return consumerCache.size();
     }
-    
+
     public CamelContext getCamelContext() {
         return context;
     }
 
     public Exchange receive(String endpointUri) {
         Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
-        return consumerCache.receive(endpoint);
+        return getConsumerCache().receive(endpoint);
     }
 
     public Exchange receive(Endpoint endpoint) {
@@ -60,7 +68,7 @@ public class DefaultConsumerTemplate imp
 
     public Exchange receive(String endpointUri, long timeout) {
         Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
-        return consumerCache.receive(endpoint, timeout);
+        return getConsumerCache().receive(endpoint, timeout);
     }
 
     public Exchange receive(Endpoint endpoint, long timeout) {
@@ -69,7 +77,7 @@ public class DefaultConsumerTemplate imp
 
     public Exchange receiveNoWait(String endpointUri) {
         Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
-        return consumerCache.receiveNoWait(endpoint);
+        return getConsumerCache().receiveNoWait(endpoint);
     }
 
     public Exchange receiveNoWait(Endpoint endpoint) {
@@ -162,4 +170,28 @@ public class DefaultConsumerTemplate imp
         }
         return answer;
     }
+
+    private ConsumerCache getConsumerCache() {
+        if (!isStarted()) {
+            throw new IllegalStateException("ConsumerTemplate has not been started");
+        }
+        return consumerCache;
+    }
+
+    protected void doStart() throws Exception {
+        if (consumerCache == null) {
+            if (maximumCacheSize > 0) {
+                consumerCache = new ConsumerCache(context, maximumCacheSize);
+            } else {
+                consumerCache = new ConsumerCache(context);
+            }
+        }
+        ServiceHelper.startService(consumerCache);
+    }
+
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(consumerCache);
+        consumerCache = null;
+    }
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerServicePool.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerServicePool.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerServicePool.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerServicePool.java Wed Apr  7 06:31:41 2010
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.impl;
 
-import java.util.concurrent.BlockingQueue;
-
 import org.apache.camel.Endpoint;
 import org.apache.camel.Producer;
 
@@ -35,12 +33,4 @@ public class DefaultProducerServicePool 
         super(capacity);
     }
 
-    public synchronized int size() {
-        int size = 0;
-        for (BlockingQueue<Producer> queue : pool.values()) {
-            size += queue.size();
-        }
-        return size;
-    }
-
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Wed Apr  7 06:31:41 2010
@@ -38,26 +38,21 @@ import org.apache.camel.util.ObjectHelpe
 import org.apache.camel.util.ServiceHelper;
 
 /**
- * A client helper object (named like Spring's TransactionTemplate & JmsTemplate
- * et al) for working with Camel and sending {@link org.apache.camel.Message} instances in an
- * {@link org.apache.camel.Exchange} to an {@link org.apache.camel.Endpoint}.
- *
  * @version $Revision$
  */
 public class DefaultProducerTemplate extends ServiceSupport implements ProducerTemplate {
     private final CamelContext context;
-    private final ProducerCache producerCache;
+    private ProducerCache producerCache;
     private Endpoint defaultEndpoint;
     private ExecutorService executor;
+    private int maximumCacheSize;
 
     public DefaultProducerTemplate(CamelContext context) {
         this.context = context;
-        this.producerCache = new ProducerCache(context);
     }
 
     public DefaultProducerTemplate(CamelContext context, ExecutorService executor) {
         this.context = context;
-        this.producerCache = new ProducerCache(context);
         this.executor = executor;
     }
 
@@ -71,6 +66,21 @@ public class DefaultProducerTemplate ext
         return new DefaultProducerTemplate(camelContext, endpoint);
     }
 
+    public int getMaximumCacheSize() {
+        return maximumCacheSize;
+    }
+
+    public void setMaximumCacheSize(int maximumCacheSize) {
+        this.maximumCacheSize = maximumCacheSize;
+    }
+
+    public int getCurrentCacheSize() {
+        if (producerCache == null) {
+            return 0;
+        }
+        return producerCache.size();
+    }
+
     public Exchange send(String endpointUri, Exchange exchange) {
         Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
         return send(endpoint, exchange);
@@ -87,16 +97,16 @@ public class DefaultProducerTemplate ext
     }
 
     public Exchange send(Endpoint endpoint, Exchange exchange) {
-        producerCache.send(endpoint, exchange);
+        getProducerCache().send(endpoint, exchange);
         return exchange;
     }
 
     public Exchange send(Endpoint endpoint, Processor processor) {
-        return producerCache.send(endpoint, processor);
+        return getProducerCache().send(endpoint, processor);
     }
 
     public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
-        return producerCache.send(endpoint, pattern, processor);
+        return getProducerCache().send(endpoint, pattern, processor);
     }
 
     public Object sendBody(Endpoint endpoint, ExchangePattern pattern, Object body) {
@@ -680,7 +690,7 @@ public class DefaultProducerTemplate ext
     public Future<Exchange> asyncCallback(final Endpoint endpoint, final Processor processor, final Synchronization onCompletion) {
         Callable<Exchange> task = new Callable<Exchange>() {
             public Exchange call() throws Exception {
-                Exchange answer = producerCache.send(endpoint, processor);
+                Exchange answer = getProducerCache().send(endpoint, processor);
 
                 // invoke callback before returning answer
                 // as it allows callback to be used without UnitOfWorkProcessor invoking it
@@ -701,7 +711,21 @@ public class DefaultProducerTemplate ext
         return executor.submit(task);
     }
 
+    private ProducerCache getProducerCache() {
+        if (!isStarted()) {
+            throw new IllegalStateException("ProducerTemplate has not been started");
+        }
+        return producerCache;
+    }
+
     protected void doStart() throws Exception {
+        if (producerCache == null) {
+            if (maximumCacheSize > 0) {
+                producerCache = new ProducerCache(context, maximumCacheSize);
+            } else {
+                producerCache = new ProducerCache(context);
+            }
+        }
         ServiceHelper.startService(producerCache);
         if (executor == null) {
             executor = context.getExecutorServiceStrategy().newDefaultThreadPool(this, "ProducerTemplate");
@@ -710,6 +734,8 @@ public class DefaultProducerTemplate ext
 
     protected void doStop() throws Exception {
         ServiceHelper.stopService(producerCache);
+        producerCache = null;
+
         if (executor != null) {
             context.getExecutorServiceStrategy().shutdownNow(executor);
             executor = null;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java Wed Apr  7 06:31:41 2010
@@ -46,6 +46,14 @@ public abstract class DefaultServicePool
         this.capacity = capacity;
     }
 
+    public synchronized int size() {
+        int size = 0;
+        for (BlockingQueue<Service> entry : pool.values()) {
+            size += entry.size();
+        }
+        return size;
+    }
+
     public synchronized Service addAndAcquire(Key key, Service service) {
         BlockingQueue<Service> entry = pool.get(key);
         if (entry == null) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java Wed Apr  7 06:31:41 2010
@@ -323,14 +323,14 @@ public abstract class MainSupport extend
      * Returns a {@link org.apache.camel.ProducerTemplate} from the Spring {@link org.springframework.context.ApplicationContext} instances
      * or lazily creates a new one dynamically
      */
-    public ProducerTemplate getCamelTemplate() {
+    public ProducerTemplate getCamelTemplate() throws Exception {
         if (camelTemplate == null) {
             camelTemplate = findOrCreateCamelTemplate();
         }
         return camelTemplate;
     }
 
-    protected abstract ProducerTemplate findOrCreateCamelTemplate();
+    protected abstract ProducerTemplate findOrCreateCamelTemplate() throws Exception;
 
     protected abstract Map<String, CamelContext> getCamelContextMap();
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Wed Apr  7 06:31:41 2010
@@ -28,11 +28,13 @@ import org.apache.camel.Producer;
 import org.apache.camel.ProducerCallback;
 import org.apache.camel.ServicePoolAware;
 import org.apache.camel.spi.ServicePool;
+import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.EventHelper;
 import org.apache.camel.util.LRUCache;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
 
 /**
@@ -43,22 +45,24 @@ import static org.apache.camel.util.Obje
 public class ProducerCache extends ServiceSupport {
     private static final transient Log LOG = LogFactory.getLog(ProducerCache.class);
 
-    private final Map<String, Producer> producers;
-    private final ServicePool<Endpoint, Producer> pool;
-    private final CamelContext context;
+    // TODO: Expose this cache for management in JMX (also ConsumerCache)
+    // TODO: Add source information so we know who uses this cache
+    // TODO: Add purge operation to purge the cache
 
-    // TODO: Have easy configuration of pooling in Camel
+    private final CamelContext camelContext;
+    private final ServicePool<Endpoint, Producer> pool;
+    private final Map<String, Producer> producers;
 
-    public ProducerCache(CamelContext context) {
-        this(context, context.getProducerServicePool());
+    public ProducerCache(CamelContext camelContext) {
+        this(camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext));
     }
 
-    public ProducerCache(CamelContext context, ServicePool<Endpoint, Producer> producerServicePool) {
-        this(context, producerServicePool, new LRUCache<String, Producer>(1000));
+    public ProducerCache(CamelContext camelContext, int cacheSize) {
+        this(camelContext, camelContext.getProducerServicePool(), new LRUCache<String, Producer>(cacheSize));
     }
 
-    public ProducerCache(CamelContext context, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) {
-        this.context = context;
+    public ProducerCache(CamelContext camelContext, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) {
+        this.camelContext = camelContext;
         this.pool = producerServicePool;
         this.producers = cache;
     }
@@ -258,23 +262,23 @@ public class ProducerCache extends Servi
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(producers);
+        ServiceHelper.stopServices(producers, pool);
         producers.clear();
-
-        ServiceHelper.stopServices(pool);
     }
 
     protected void doStart() throws Exception {
-        ServiceHelper.startServices(pool);
+        ServiceHelper.startServices(pool, producers);
     }
 
     /**
-     * Returns the current size of the producer cache
+     * Returns the current size of the cache
      *
      * @return the current size
      */
     int size() {
-        return producers.size();
+        int size = producers.size();
+        size += pool.size();
+        return size;
     }
 
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java Wed Apr  7 06:31:41 2010
@@ -135,15 +135,22 @@ public class ManagedCamelContext {
     @ManagedOperation(description = "Send body (in only)")
     public void sendBody(String endpointUri, String body) throws Exception {
         ProducerTemplate template = context.createProducerTemplate();
-        template.sendBody(endpointUri, body);
-        template.stop();
+        try {
+            template.sendBody(endpointUri, body);
+        } finally {
+            template.stop();
+        }
     }
 
     @ManagedOperation(description = "Request body (in out)")
     public Object requestBody(String endpointUri, String body) throws Exception {
         ProducerTemplate template = context.createProducerTemplate();
-        Object answer = template.requestBody(endpointUri, body);
-        template.stop();
+        Object answer = null;
+        try {
+            answer = template.requestBody(endpointUri, body);
+        } finally {
+            template.stop();
+        }
         return answer;
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Wed Apr  7 06:31:41 2010
@@ -85,9 +85,9 @@ public class RecipientListDefinition<Typ
 
         RecipientList answer;
         if (delimiter != null) {
-            answer = new RecipientList(expression, delimiter);
+            answer = new RecipientList(routeContext.getCamelContext(), expression, delimiter);
         } else {
-            answer = new RecipientList(expression);
+            answer = new RecipientList(routeContext.getCamelContext(), expression);
         }
         answer.setAggregationStrategy(createAggregationStrategy(routeContext));
         answer.setParallelProcessing(isParallelProcessing());

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java Wed Apr  7 06:31:41 2010
@@ -69,7 +69,7 @@ public class RoutingSlipDefinition exten
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         ObjectHelper.notEmpty(getHeaderName(), "headerName", this);
         ObjectHelper.notEmpty(getUriDelimiter(), "uriDelimiter", this);
-        return new RoutingSlip(getHeaderName(), getUriDelimiter());
+        return new RoutingSlip(routeContext.getCamelContext(), getHeaderName(), getUriDelimiter());
     }
 
     @Override

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Wed Apr  7 06:31:41 2010
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
@@ -34,6 +35,9 @@ import org.apache.camel.processor.aggreg
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+
+import static org.apache.camel.util.ObjectHelper.notNull;
 
 /**
  * Implements a dynamic <a
@@ -44,6 +48,7 @@ import org.apache.camel.util.ObjectHelpe
  * @version $Revision$
  */
 public class RecipientList extends ServiceSupport implements Processor {
+    private final CamelContext camelContext;
     private ProducerCache producerCache;
     private Expression expression;
     private final String delimiter;
@@ -52,23 +57,28 @@ public class RecipientList extends Servi
     private ExecutorService executorService;
     private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy();
 
-    public RecipientList() {
+    public RecipientList(CamelContext camelContext) {
         // use comma by default as delimiter
-        this.delimiter = ",";
+        this(camelContext, ",");
     }
 
-    public RecipientList(String delimiter) {
+    public RecipientList(CamelContext camelContext, String delimiter) {
+        notNull(camelContext, "camelContext");
+        ObjectHelper.notEmpty(delimiter, "delimiter");
+        this.camelContext = camelContext;
         this.delimiter = delimiter;
     }
 
-    public RecipientList(Expression expression) {
+    public RecipientList(CamelContext camelContext, Expression expression) {
         // use comma by default as delimiter
-        this(expression, ",");
+        this(camelContext, expression, ",");
     }
 
-    public RecipientList(Expression expression, String delimiter) {
+    public RecipientList(CamelContext camelContext, Expression expression, String delimiter) {
+        notNull(camelContext, "camelContext");
         ObjectHelper.notNull(expression, "expression");
         ObjectHelper.notEmpty(delimiter, "delimiter");
+        this.camelContext = camelContext;
         this.expression = expression;
         this.delimiter = delimiter;
     }
@@ -79,6 +89,10 @@ public class RecipientList extends Servi
     }
 
     public void process(Exchange exchange) throws Exception {
+        if (!isStarted()) {
+            throw new IllegalStateException("RecipientList has not been started: " + this);
+        }
+
         Object receipientList = expression.evaluate(exchange, Object.class);
         sendToRecipientList(exchange, receipientList);
     }
@@ -91,7 +105,6 @@ public class RecipientList extends Servi
 
         // we should acquire and release the producers we need so we can leverage the producer
         // cache to the fullest
-        ProducerCache cache = getProducerCache(exchange);
         Map<Endpoint, Producer> producers = new LinkedHashMap<Endpoint, Producer>();
         try {
             List<Processor> processors = new ArrayList<Processor>();
@@ -99,7 +112,7 @@ public class RecipientList extends Servi
                 Object recipient = iter.next();
                 Endpoint endpoint = resolveEndpoint(exchange, recipient);
                 // acquire producer which we then release later
-                Producer producer = cache.acquireProducer(endpoint);
+                Producer producer = producerCache.acquireProducer(endpoint);
                 processors.add(producer);
                 producers.put(endpoint, producer);
             }
@@ -112,20 +125,11 @@ public class RecipientList extends Servi
         } finally {
             // and release the producers back to the producer cache
             for (Map.Entry<Endpoint, Producer> entry : producers.entrySet()) {
-                cache.releaseProducer(entry.getKey(), entry.getValue());
+                producerCache.releaseProducer(entry.getKey(), entry.getValue());
             }
         }
     }
 
-    protected ProducerCache getProducerCache(Exchange exchange) throws Exception {
-        // setup producer cache as we need to use the pluggable service pool defined on camel context
-        if (producerCache == null) {
-            this.producerCache = new ProducerCache(exchange.getContext());
-            this.producerCache.start();
-        }
-        return this.producerCache;
-    }
-
     protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
         // trim strings as end users might have added spaces between separators
         if (recipient instanceof String) {
@@ -135,15 +139,16 @@ public class RecipientList extends Servi
     }
 
     protected void doStart() throws Exception {
-        if (producerCache != null) {
-            producerCache.start();
+        if (producerCache == null) {
+            producerCache = new ProducerCache(camelContext);
+            // add it as a service so we can manage it
+            camelContext.addService(producerCache);
         }
+        ServiceHelper.startService(producerCache);
     }
 
     protected void doStop() throws Exception {
-        if (producerCache != null) {
-            producerCache.stop();
-        }
+        ServiceHelper.stopService(producerCache);
     }
 
     public boolean isParallelProcessing() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java Wed Apr  7 06:31:41 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -28,6 +29,7 @@ import org.apache.camel.impl.ProducerCac
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.model.RoutingSlipDefinition;
 import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -43,15 +45,18 @@ public class RoutingSlip extends Service
     private ProducerCache producerCache;
     private final String header;
     private final String uriDelimiter;
+    private final CamelContext camelContext;
 
-    public RoutingSlip(String header) {
-        this(header, RoutingSlipDefinition.DEFAULT_DELIMITER);
+    public RoutingSlip(CamelContext camelContext, String header) {
+        this(camelContext, header, RoutingSlipDefinition.DEFAULT_DELIMITER);
     }
 
-    public RoutingSlip(String header, String uriDelimiter) {
+    public RoutingSlip(CamelContext camelContext, String header, String uriDelimiter) {
+        notNull(camelContext, "camelContext");
         notNull(header, "header");
         notNull(uriDelimiter, "uriDelimiter");
 
+        this.camelContext = camelContext;
         this.header = header;
         this.uriDelimiter = uriDelimiter;
     }
@@ -66,6 +71,10 @@ public class RoutingSlip extends Service
     }
 
     public void process(Exchange exchange) throws Exception {
+        if (!isStarted()) {
+            throw new IllegalStateException("RoutingSlip has not been started: " + this);
+        }
+
         Message message = exchange.getIn();
         String[] recipients = recipients(message);
         Exchange current = exchange;
@@ -78,7 +87,7 @@ public class RoutingSlip extends Service
             copyOutToIn(copy, current);
 
             try {                
-                getProducerCache(exchange).doInProducer(endpoint, copy, null, new ProducerCallback<Object>() {
+                producerCache.doInProducer(endpoint, copy, null, new ProducerCallback<Object>() {
                     public Object doInProducer(Producer producer, Exchange exchange, ExchangePattern exchangePattern) throws Exception {
                         // set property which endpoint we send to
                         exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
@@ -125,29 +134,21 @@ public class RoutingSlip extends Service
         return Boolean.TRUE.equals(nextExchange.getProperty(Exchange.ERRORHANDLER_HANDLED));
     }
     
-    protected ProducerCache getProducerCache(Exchange exchange) throws Exception {
-        // setup producer cache as we need to use the pluggable service pool defined on camel context
-        if (producerCache == null) {
-            this.producerCache = new ProducerCache(exchange.getContext());
-            this.producerCache.start();
-        }
-        return this.producerCache;
-    }
-
     protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
         return ExchangeHelper.resolveEndpoint(exchange, recipient);
     }
 
     protected void doStart() throws Exception {
-        if (producerCache != null) {
-            producerCache.start();
+        if (producerCache == null) {
+            producerCache = new ProducerCache(camelContext);
+            // add it as a service so we can manage it
+            camelContext.addService(producerCache);
         }
+        ServiceHelper.startService(producerCache);
     }
 
     protected void doStop() throws Exception {
-        if (producerCache != null) {
-            producerCache.stop();
-        }
+        ServiceHelper.stopService(producerCache);
     }
 
     private void updateRoutingSlip(Exchange current) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java Wed Apr  7 06:31:41 2010
@@ -85,9 +85,8 @@ public class SendAsyncProcessor extends 
     public Exchange doProcess(Exchange exchange) throws Exception {
         // now we are done, we should have a API callback for this
         // send the exchange to the destination using a producer
-        final ProducerCache cache = getProducerCache(exchange);
         // acquire the producer from the service pool
-        final Producer producer = cache.acquireProducer(destination);
+        final Producer producer = producerCache.acquireProducer(destination);
         ObjectHelper.notNull(producer, "producer");
 
         // pass in the callback that adds the exchange to the completed list of tasks
@@ -101,7 +100,7 @@ public class SendAsyncProcessor extends 
                 } finally {
                     // must return the producer to service pool when we are done
                     try {
-                        cache.releaseProducer(destination, producer);
+                        producerCache.releaseProducer(destination, producer);
                     } catch (Exception e) {
                         LOG.warn("Error releasing producer: " + producer + ". This exception will be ignored.", e);
                     }
@@ -282,7 +281,7 @@ public class SendAsyncProcessor extends 
         super.doStart();
 
         if (poolSize <= 0) {
-            throw new IllegalArgumentException("PoolSize must be a positive number");
+            throw new IllegalArgumentException("PoolSize must be a positive number, was: " + poolSize);
         }
 
         for (int i = 0; i < poolSize; i++) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java Wed Apr  7 06:31:41 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -26,6 +27,7 @@ import org.apache.camel.impl.InterceptSe
 import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -36,6 +38,7 @@ import org.apache.commons.logging.LogFac
  */
 public class SendProcessor extends ServiceSupport implements Processor, Traceable {
     protected static final transient Log LOG = LogFactory.getLog(SendProcessor.class);
+    protected final CamelContext camelContext;
     protected ProducerCache producerCache;
     protected Endpoint destination;
     protected ExchangePattern pattern;
@@ -44,6 +47,8 @@ public class SendProcessor extends Servi
     public SendProcessor(Endpoint destination) {
         ObjectHelper.notNull(destination, "destination");
         this.destination = destination;
+        this.camelContext = destination.getCamelContext();
+        ObjectHelper.notNull(this.camelContext, "camelContext");
     }
 
     public SendProcessor(Endpoint destination, ExchangePattern pattern) {
@@ -90,8 +95,12 @@ public class SendProcessor extends Servi
      * @return the exchange that was processed
      */
     public Exchange doProcess(final Exchange exchange) throws Exception {
+        if (!isStarted()) {
+            throw new IllegalStateException("SendProcessor has not been started: " + this);
+        }
+
         // send the exchange to the destination using a producer
-        return getProducerCache(exchange).doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() {
+        return producerCache.doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() {
             public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception {
                 exchange = configureExchange(exchange, pattern);
                 producer.process(exchange);
@@ -100,15 +109,6 @@ public class SendProcessor extends Servi
         });
     }
 
-    protected ProducerCache getProducerCache(Exchange exchange) throws Exception {
-        // setup producer cache as we need to use the pluggable service pool defined on camel context
-        if (producerCache == null) {
-            producerCache = new ProducerCache(exchange.getContext());
-            producerCache.start();
-        }
-        return producerCache;
-    }
-
     public Endpoint getDestination() {
         return destination;
     }
@@ -127,15 +127,16 @@ public class SendProcessor extends Servi
     }
 
     protected void doStart() throws Exception {
-        if (producerCache != null) {
-            producerCache.start();
+        if (producerCache == null) {
+            producerCache = new ProducerCache(camelContext);
+            // add it as a service so we can manage it
+            camelContext.addService(producerCache);
         }
+        ServiceHelper.startService(producerCache);
     }
 
     protected void doStop() throws Exception {
-        if (producerCache != null) {
-            producerCache.stop();
-        }
+        ServiceHelper.stopService(producerCache);
     }
 
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Wed Apr  7 06:31:41 2010
@@ -19,7 +19,6 @@ package org.apache.camel.processor;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
-import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -37,8 +36,6 @@ import org.apache.camel.util.ObjectHelpe
  * @version $Revision$
  */
 public class WireTapProcessor extends SendProcessor {
-
-    private final CamelContext camelContext;
     private final ExecutorService executorService;
 
     // expression or processor used for populating a new exchange to send
@@ -50,14 +47,12 @@ public class WireTapProcessor extends Se
         super(destination);
         ObjectHelper.notNull(executorService, "executorService");
         this.executorService = executorService;
-        this.camelContext = destination.getCamelContext();
     }
 
     public WireTapProcessor(Endpoint destination, ExchangePattern pattern, ExecutorService executorService) {
         super(destination, pattern);
         ObjectHelper.notNull(executorService, "executorService");
         this.executorService = executorService;
-        this.camelContext = destination.getCamelContext();
     }
 
     @Override
@@ -71,7 +66,7 @@ public class WireTapProcessor extends Se
     }
 
     public void process(Exchange exchange) throws Exception {
-        getProducerCache(exchange).doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() {
+        producerCache.doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() {
             public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception {
                 Exchange wireTapExchange = configureExchange(exchange, pattern);
                 processWireTap(producer, wireTapExchange);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Wed Apr  7 06:31:41 2010
@@ -598,7 +598,7 @@ public class AggregateProcessor extends 
 
         }
 
-        ServiceHelper.startService(aggregationRepository);
+        ServiceHelper.startServices(processor, aggregationRepository);
 
         // should we use recover checker
         if (aggregationRepository instanceof RecoverableAggregationRepository) {
@@ -629,7 +629,7 @@ public class AggregateProcessor extends 
 
     @Override
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(timeoutMap, recoverService, aggregationRepository);
+        ServiceHelper.stopServices(timeoutMap, recoverService, aggregationRepository, processor);
 
         if (closedCorrelationKeys != null) {
             closedCorrelationKeys.clear();

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ServicePool.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ServicePool.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ServicePool.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ServicePool.java Wed Apr  7 06:31:41 2010
@@ -21,15 +21,18 @@ package org.apache.camel.spi;
  * <p/>
  * Services that is capable of being pooled should implement the marker interface
  * {@link org.apache.camel.ServicePoolAware}.
+ * <p/>
+ * Notice the capacity is <b>per key</b> which means that each key can contain at most
+ * (the capacity) services. The pool can contain an unbounded number of keys.
  *
  * @version $Revision$
  */
-public interface ServicePool<Key, Service> extends org.apache.camel.Service {
+public interface ServicePool<Key, Service> {
 
     /**
      * Adds the given service to the pool and acquires it.
      *
-     * @param key the key
+     * @param key     the key
      * @param service the service
      * @return the acquired service, is newer <tt>null</tt>
      */
@@ -37,7 +40,7 @@ public interface ServicePool<Key, Servic
 
     /**
      * Tries to acquire the service with the given key
-     * 
+     *
      * @param key the key
      * @return the acquired service, or <tt>null</tt> if no free in pool
      */
@@ -46,9 +49,16 @@ public interface ServicePool<Key, Servic
     /**
      * Releases the service back to the pool
      *
-     * @param key  the key
+     * @param key     the key
      * @param service the service
      */
     void release(Key key, Service service);
 
+    /**
+     * Returns the current size of the pool
+     *
+     * @return the current size of the pool
+     */
+    int size();
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/CamelContextHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/CamelContextHelper.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/CamelContextHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/CamelContextHelper.java Wed Apr  7 06:31:41 2010
@@ -18,6 +18,7 @@ package org.apache.camel.util;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
 import org.apache.camel.NoSuchEndpointException;
 
 import static org.apache.camel.util.ObjectHelper.isEmpty;
@@ -154,4 +155,28 @@ public final class CamelContextHelper {
         return endpoint;
     }
 
+    /**
+     * Gets the maximum cache pool size.
+     * <p/>
+     * Will use the property set on CamelContext with the key {@link Exchange#MAXIMUM_CACHE_POOL_SIZE}.
+     * If no property has been set, then it will fallback to return a size of 1000.
+     *
+     * @param camelContext the camel context
+     * @return the maximum cache size
+     * @throws IllegalArgumentException is thrown if the property is illegal
+     */
+    public static int getMaximumCachePoolSize(CamelContext camelContext) throws IllegalArgumentException {
+        String s = camelContext.getProperties().get(Exchange.MAXIMUM_CACHE_POOL_SIZE);
+        if (s != null) {
+            Integer size = camelContext.getTypeConverter().convertTo(Integer.class, s);
+            if (size == null || size <= 0) {
+                throw new IllegalArgumentException("Property " + Exchange.MAXIMUM_CACHE_POOL_SIZE + " must be a positive number, was: " + s);
+            }
+            return size;
+        }
+
+        // 1000 is the default fallback
+        return 1000;
+    }
+
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java Wed Apr  7 06:31:41 2010
@@ -97,6 +97,8 @@ public class SedaConcurrentTest extends 
 
         // use our own template that has a higher thread pool than default camel that uses 5
         ProducerTemplate pt = new DefaultProducerTemplate(context, Executors.newFixedThreadPool(10));
+        // must start the template
+        pt.start();
 
         List<Future> replies = new ArrayList<Future>(20);
         for (int i = 0; i < 20; i++) {

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java Wed Apr  7 06:31:41 2010
@@ -30,7 +30,7 @@ import org.apache.camel.util.ServiceHelp
 public class VmRouteTest extends TestSupport {
     private CamelContext context1 = new DefaultCamelContext();
     private CamelContext context2 = new DefaultCamelContext();
-    private ProducerTemplate template = context1.createProducerTemplate();
+    private ProducerTemplate template;
     private Object expectedBody = "<hello>world!</hello>";
 
     public void testSedaQueue() throws Exception {
@@ -59,11 +59,13 @@ public class VmRouteTest extends TestSup
         });
 
         ServiceHelper.startServices(context1, context2);
+
+        template = context1.createProducerTemplate();
     }
 
     @Override
     protected void tearDown() throws Exception {
-        ServiceHelper.stopServices(context2, context1);
+        ServiceHelper.stopServices(context2, context1, template);
         super.tearDown();
     }
 }
\ No newline at end of file

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomProducerServicePoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomProducerServicePoolTest.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomProducerServicePoolTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomProducerServicePoolTest.java Wed Apr  7 06:31:41 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.impl;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Endpoint;
@@ -38,6 +39,10 @@ public class CustomProducerServicePoolTe
 
     private class MyEndpoint extends DefaultEndpoint {
 
+        private MyEndpoint(String endpointUri, CamelContext camelContext) {
+            super(endpointUri, camelContext);
+        }
+
         public Producer createProducer() throws Exception {
             return new MyProducer(this);
         }
@@ -71,14 +76,6 @@ public class CustomProducerServicePoolTe
 
         private Producer producer;
 
-        public void setCapacity(int capacity) {
-            // noop
-        }
-
-        public int getCapacity() {
-            return 0;
-        }
-
         public Producer addAndAcquire(Endpoint endpoint, Producer producer) {
             if (endpoint instanceof MyEndpoint) {
                 return producer;
@@ -122,7 +119,7 @@ public class CustomProducerServicePoolTe
         pool.start();
         context.setProducerServicePool(pool);
 
-        context.addEndpoint("my", new MyEndpoint());
+        context.addEndpoint("my", new MyEndpoint("my", context));
 
         Endpoint endpoint = context.getEndpoint("my");
 
@@ -147,7 +144,7 @@ public class CustomProducerServicePoolTe
     }
 
     public void testCustomProducerServicePoolInRoute() throws Exception {
-        context.addEndpoint("my", new MyEndpoint());
+        context.addEndpoint("my", new MyEndpoint("my", context));
 
         MyPool pool = new MyPool();
         pool.start();

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerCacheTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerCacheTest.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerCacheTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerCacheTest.java Wed Apr  7 06:31:41 2010
@@ -26,12 +26,12 @@ import org.apache.camel.PollingConsumer;
 public class DefaultConsumerCacheTest extends ContextTestSupport {
 
     public void testCacheConsumers() throws Exception {
-        ConsumerCache cache = new ConsumerCache();
+        ConsumerCache cache = new ConsumerCache(context);
         cache.start();
 
         assertEquals("Size should be 0", 0, cache.size());
 
-        // test that we cache at most 1000 producers to avoid it eating to much memory
+        // test that we cache at most 1000 consumers to avoid it eating to much memory
         for (int i = 0; i < 1003; i++) {
             Endpoint e = context.getEndpoint("direct:queue:" + i);
             PollingConsumer p = cache.getConsumer(e);

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateTest.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateTest.java Wed Apr  7 06:31:41 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.impl;
 
+import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -291,4 +292,42 @@ public class DefaultConsumerTemplateTest
         assertEquals("Bye World", out);
     }
 
+    public void testCacheConsumers() throws Exception {
+        ConsumerTemplate template = new DefaultConsumerTemplate(context);
+        template.setMaximumCacheSize(500);
+        template.start();
+
+        assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+
+        // test that we cache at most 500 consumers to avoid it eating to much memory
+        for (int i = 0; i < 503; i++) {
+            Endpoint e = context.getEndpoint("direct:queue:" + i);
+            Exchange ex = template.receiveNoWait(e);
+        }
+
+        assertEquals("Size should be 500", 500, template.getCurrentCacheSize());
+        template.stop();
+
+        // should be 0
+        assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+    }
+
+    public void testCacheConsumersFromContext() throws Exception {
+        ConsumerTemplate template = context.createConsumerTemplate(500);
+
+        assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+
+        // test that we cache at most 500 consumers to avoid it eating to much memory
+        for (int i = 0; i < 503; i++) {
+            Endpoint e = context.getEndpoint("direct:queue:" + i);
+            Exchange ex = template.receiveNoWait(e);
+        }
+
+        assertEquals("Size should be 500", 500, template.getCurrentCacheSize());
+        template.stop();
+
+        // should be 0
+        assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+    }
+
 }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateWithCustomCacheMaxSizeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateWithCustomCacheMaxSizeTest.java?rev=931444&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateWithCustomCacheMaxSizeTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateWithCustomCacheMaxSizeTest.java Wed Apr  7 06:31:41 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ConsumerTemplate;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+
+/**
+ * @version $Revision$
+ */
+public class DefaultConsumerTemplateWithCustomCacheMaxSizeTest extends ContextTestSupport {
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        context.getProperties().put(Exchange.MAXIMUM_CACHE_POOL_SIZE, "200");
+        return context;
+    }
+
+    public void testCacheConsumers() throws Exception {
+        ConsumerTemplate template = context.createConsumerTemplate();
+
+        assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+
+        // test that we cache at most 500 producers to avoid it eating to much memory
+        for (int i = 0; i < 203; i++) {
+            Endpoint e = context.getEndpoint("direct:queue:" + i);
+            template.receiveNoWait(e);
+        }
+
+        assertEquals("Size should be 200", 200, template.getCurrentCacheSize());
+        template.stop();
+
+        // should be 0
+        assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+    }
+
+    public void testInvalidSizeABC() {
+        context.getProperties().put(Exchange.MAXIMUM_CACHE_POOL_SIZE, "ABC");
+        try {
+            context.createConsumerTemplate();
+            fail("Should have thrown an exception");
+        } catch (Exception e) {
+            assertEquals("Property CamelMaximumCachePoolSize must be a positive number, was: ABC", e.getMessage());
+        }
+    }
+
+    public void testInvalidSizeZero() {
+        context.getProperties().put(Exchange.MAXIMUM_CACHE_POOL_SIZE, "0");
+        try {
+            context.createConsumerTemplate();
+            fail("Should have thrown an exception");
+        } catch (Exception e) {
+            assertEquals("Property CamelMaximumCachePoolSize must be a positive number, was: 0", e.getMessage());
+        }
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateWithCustomCacheMaxSizeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateWithCustomCacheMaxSizeTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateTest.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateTest.java Wed Apr  7 06:31:41 2010
@@ -108,6 +108,7 @@ public class DefaultProducerTemplateTest
 
     public void testRequestUsingDefaultEndpoint() throws Exception {
         ProducerTemplate producer = new DefaultProducerTemplate(context, context.getEndpoint("direct:out"));
+        producer.start();
 
         Object out = producer.requestBody("Hello");
         assertEquals("Bye Bye World", out);
@@ -118,10 +119,13 @@ public class DefaultProducerTemplateTest
         Map<String, Object> headers = new HashMap<String, Object>();
         out = producer.requestBodyAndHeaders("Hello", headers);
         assertEquals("Bye Bye World", out);
+
+        producer.stop();
     }
 
     public void testSendUsingDefaultEndpoint() throws Exception {
         ProducerTemplate producer = new DefaultProducerTemplate(context, context.getEndpoint("direct:in"));
+        producer.start();
 
         getMockEndpoint("mock:result").expectedMessageCount(3);
 
@@ -131,6 +135,8 @@ public class DefaultProducerTemplateTest
         producer.sendBodyAndHeaders("Hello", headers);
 
         assertMockEndpointsSatisfied();
+
+        producer.stop();
     }
 
     @Override
@@ -169,4 +175,43 @@ public class DefaultProducerTemplateTest
             }
         };
     }
+
+    public void testCacheProducers() throws Exception {
+        ProducerTemplate template = new DefaultProducerTemplate(context);
+        template.setMaximumCacheSize(500);
+        template.start();
+
+        assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+
+        // test that we cache at most 500 producers to avoid it eating to much memory
+        for (int i = 0; i < 503; i++) {
+            Endpoint e = context.getEndpoint("direct:queue:" + i);
+            template.sendBody(e, "Hello");
+        }
+
+        assertEquals("Size should be 500", 500, template.getCurrentCacheSize());
+        template.stop();
+
+        // should be 0
+        assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+    }
+
+    public void testCacheProducersFromContext() throws Exception {
+        ProducerTemplate template = context.createProducerTemplate(500);
+
+        assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+
+        // test that we cache at most 500 producers to avoid it eating to much memory
+        for (int i = 0; i < 503; i++) {
+            Endpoint e = context.getEndpoint("direct:queue:" + i);
+            template.sendBody(e, "Hello");
+        }
+
+        assertEquals("Size should be 500", 500, template.getCurrentCacheSize());
+        template.stop();
+
+        // should be 0
+        assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+    }
+
 }