You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/07 08:31:43 UTC
svn commit: r931444 [1/2] - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/component/bean/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/management/mbean...
Author: davsclaus
Date: Wed Apr 7 06:31:41 2010
New Revision: 931444
URL: http://svn.apache.org/viewvc?rev=931444&view=rev
Log:
CAMEL-2558: Producer and ConsumerTemplate can now have its max cache size configured. Also CamelContext now returns already started templates in its create template methods.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateWithCustomCacheMaxSizeTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateWithCustomCacheMaxSizeTest.java (with props)
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/ConsumerTemplateMaximumCacheSizeTest.java (with props)
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/ProducerTemplateMaximumCacheSizeTest.java
- copied, changed from r931425, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/ProducerTemplateAutoRegisterTest.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/config/ConsumerTemplateMaximumCacheSizeTest-context.xml (with props)
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/config/ProducerTemplateMaximumCacheSizeTest-context.xml
- copied, changed from r931425, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/config/ProducerTemplateAutoRegisterTest-context.xml
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerServicePool.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ServicePool.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/CamelContextHelper.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomProducerServicePoolTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerCacheTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MainSupportTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ServicePoolTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelFineGrainedErrorHandlingTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SamplingThrottlerTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitRouteNumberOfProcessorTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionProcessorInspectCausedExceptionTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionProcessorInspectCausedExceptionWithDefaultErrorHandlerTest.java
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java
camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfProducerTest.java
camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/transport/CamelJBIClientProxyTest.java
camel/trunk/components/camel-guice/src/main/java/org/apache/camel/guice/Main.java
camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcEndpointTest.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutWithSpringRestartIssueTest.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsRedeliveryWithInitialRedeliveryDelayTest.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/TransactionErrorHandlerRedeliveryDelayTest.java
camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java
camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java
camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapter.java
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelConsumerTemplateFactoryBean.java
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelProducerTemplateFactoryBean.java
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/Main.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/CustomProcessorWithNamespacesTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/RoutingUsingCamelContextFactoryTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/CamelContextAutoStartupTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/scan/SpringComponentScanTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/scan/SpringComponentScanWithDeprecatedPackagesTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/PojoDualCamelContextConsumerTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/ContainerWideInterceptorTest.java
camel/trunk/components/camel-velocity/src/test/java/org/apache/camel/component/velocity/VelocityDynamicTemplateTest.java
camel/trunk/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRouteTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Wed Apr 7 06:31:41 2010
@@ -79,6 +79,8 @@ public interface CamelContext extends Se
/**
* Adds a service, starting it so that it will be stopped with this context
+ * <p/>
+ * The added service will also be enlisted in JMX for management (if JMX is enabled)
*
* @param object the service
* @throws Exception can be thrown when starting the service
@@ -460,28 +462,62 @@ public interface CamelContext extends Se
List<String> getLanguageNames();
/**
- * Creates a new {@link ProducerTemplate} which is <b>not</b> started.
+ * Creates a new {@link ProducerTemplate} which is <b>started</b> and therefore ready to use right away.
+ * <p/>
+ * See this FAQ before use: <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">
+ * Why does Camel use too many threads with ProducerTemplate?</a>
+ * <p/>
+ * Will use cache size defined in Camel property with key {@link Exchange#MAXIMUM_CACHE_POOL_SIZE}.
+ * If no key was defined then it will fallback to a default size of 1000.
+ * You can also use the {@link org.apache.camel.ProducerTemplate#setMaximumCacheSize(int)} method to use a custom value
+ * before starting the template.
+ *
+ * @return the template
+ * @throws Exception is thrown if error starting the template
+ */
+ ProducerTemplate createProducerTemplate() throws Exception;
+
+ /**
+ * Creates a new {@link ProducerTemplate} which is <b>started</b> and therefore ready to use right away.
* <p/>
* You <b>must</b> start the template before its being used.
* <p/>
* See this FAQ before use: <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">
* Why does Camel use too many threads with ProducerTemplate?</a>
*
+ * @param maximumCacheSize the maximum cache size
* @return the template
+ * @throws Exception is thrown if error starting the template
*/
- ProducerTemplate createProducerTemplate();
+ ProducerTemplate createProducerTemplate(int maximumCacheSize) throws Exception;
/**
- * Creates a new {@link ConsumerTemplate} which is <b>not</b> started.
+ * Creates a new {@link ConsumerTemplate} which is <b>started</b> and therefore ready to use right away.
* <p/>
- * You <b>must</b> start the template before its being used.
+ * See this FAQ before use: <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">
+ * Why does Camel use too many threads with ProducerTemplate?</a> as it also applies for ConsumerTemplate.
+ * <p/>
+ * Will use cache size defined in Camel property with key {@link Exchange#MAXIMUM_CACHE_POOL_SIZE}.
+ * If no key was defined then it will fallback to a default size of 1000.
+ * You can also use the {@link org.apache.camel.ConsumerTemplate#setMaximumCacheSize(int)} method to use a custom value
+ * before starting the template.
+ *
+ * @return the template
+ * @throws Exception is thrown if error starting the template
+ */
+ ConsumerTemplate createConsumerTemplate() throws Exception;
+
+ /**
+ * Creates a new {@link ConsumerTemplate} which is <b>started</b> and therefore ready to use right away.
* <p/>
* See this FAQ before use: <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">
* Why does Camel use too many threads with ProducerTemplate?</a> as it also applies for ConsumerTemplate.
*
+ * @param maximumCacheSize the maximum cache size
* @return the template
+ * @throws Exception is thrown if error starting the template
*/
- ConsumerTemplate createConsumerTemplate();
+ ConsumerTemplate createConsumerTemplate(int maximumCacheSize) throws Exception;
/**
* Adds the given interceptor strategy
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java Wed Apr 7 06:31:41 2010
@@ -46,6 +46,33 @@ package org.apache.camel;
*/
public interface ConsumerTemplate extends Service {
+ // Configuration methods
+ // -----------------------------------------------------------------------
+
+ /**
+ * Gets the maximum cache size used.
+ *
+ * @return the maximum cache size
+ */
+ int getMaximumCacheSize();
+
+ /**
+ * Sets a custom maximum cache size.
+ *
+ * @param maximumCacheSize the custom maximum cache size
+ */
+ void setMaximumCacheSize(int maximumCacheSize);
+
+ /**
+ * Gets an approximated size of the current cached resources in the backing cache pools.
+ *
+ * @return the size of current cached resources
+ */
+ int getCurrentCacheSize();
+
+ // Synchronous methods
+ // -----------------------------------------------------------------------
+
/**
* Receives from the endpoint, waiting until there is a response
*
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Wed Apr 7 06:31:41 2010
@@ -92,7 +92,8 @@ public interface Exchange {
String LOOP_INDEX = "CamelLoopIndex";
String LOOP_SIZE = "CamelLoopSize";
- String MULTICAST_INDEX = "CamelMulticastIndex";
+ String MAXIMUM_CACHE_POOL_SIZE = "CamelMaximumCachePoolSize";
+ String MULTICAST_INDEX = "CamelMulticastIndex";
String ON_COMPLETION = "CamelOnCompletion";
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java Wed Apr 7 06:31:41 2010
@@ -30,7 +30,7 @@ import org.apache.camel.spi.Synchronizat
* {@link Exchange} to an {@link Endpoint}.
* <p/>
* <b>All</b> methods throws {@link RuntimeCamelException} if processing of
- * the {@link Exchange} failed and an Exception occured. The <tt>getCause</tt>
+ * the {@link Exchange} failed and an Exception occurred. The <tt>getCause</tt>
* method on {@link RuntimeCamelException} returns the wrapper original caused
* exception.
* <p/>
@@ -50,6 +50,30 @@ import org.apache.camel.spi.Synchronizat
*/
public interface ProducerTemplate extends Service {
+ // Configuration methods
+ // -----------------------------------------------------------------------
+
+ /**
+ * Gets the maximum cache size used in the backing cache pools.
+ *
+ * @return the maximum cache size
+ */
+ int getMaximumCacheSize();
+
+ /**
+ * Sets a custom maximum cache size to use in the backing cache pools.
+ *
+ * @param maximumCacheSize the custom maximum cache size
+ */
+ void setMaximumCacheSize(int maximumCacheSize);
+
+ /**
+ * Gets an approximated size of the current cached resources in the backing cache pools.
+ *
+ * @return the size of current cached resources
+ */
+ int getCurrentCacheSize();
+
// Synchronous methods
// -----------------------------------------------------------------------
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java Wed Apr 7 06:31:41 2010
@@ -45,7 +45,7 @@ import java.lang.annotation.Target;
public @interface RecipientList {
String context() default "";
String delimiter() default ",";
- boolean parallelProcessoing() default false;
+ boolean parallelProcessing() default false;
boolean stopOnException() default false;
String strategyRef() default "";
String executorServiceRef() default "";
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java Wed Apr 7 06:31:41 2010
@@ -36,6 +36,7 @@ import org.apache.camel.processor.Recipi
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -60,6 +61,8 @@ public class MethodInfo {
private ExchangePattern pattern = ExchangePattern.InOut;
private RecipientList recipientList;
+ // TODO: This class should extends ServiceSupport so we can cleanup recipientList when stopping
+
public MethodInfo(CamelContext camelContext, Class<?> type, Method method, List<ParameterInfo> parameters, List<ParameterInfo> bodyParameters,
boolean hasCustomAnnotation, boolean hasHandlerAnnotation, boolean voidAsInOnly) {
this.camelContext = camelContext;
@@ -85,16 +88,16 @@ public class MethodInfo {
org.apache.camel.RecipientList annotation = method.getAnnotation(org.apache.camel.RecipientList.class);
- recipientList = new RecipientList(annotation.delimiter());
+ recipientList = new RecipientList(camelContext, annotation.delimiter());
recipientList.setStopOnException(annotation.stopOnException());
- recipientList.setParallelProcessing(annotation.parallelProcessoing());
+ recipientList.setParallelProcessing(annotation.parallelProcessing());
if (ObjectHelper.isNotEmpty(annotation.executorServiceRef())) {
ExecutorService executor = CamelContextHelper.mandatoryLookup(camelContext, annotation.executorServiceRef(), ExecutorService.class);
recipientList.setExecutorService(executor);
}
- if (annotation.parallelProcessoing() && recipientList.getExecutorService() == null) {
+ if (annotation.parallelProcessing() && recipientList.getExecutorService() == null) {
// we are running in parallel so we need a thread pool
ExecutorService executor = camelContext.getExecutorServiceStrategy().newDefaultThreadPool(this, "@RecipientList");
recipientList.setExecutorService(executor);
@@ -140,6 +143,10 @@ public class MethodInfo {
}
Object result = invoke(method, pojo, arguments, exchange);
if (recipientList != null) {
+ // ensure its started
+ if (!recipientList.isStarted()) {
+ ServiceHelper.startService(recipientList);
+ }
recipientList.sendToRecipientList(exchange, result);
// we don't want to return the list of endpoints
// return Void to indicate to BeanProcessor that there is no reply
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java Wed Apr 7 06:31:41 2010
@@ -165,14 +165,28 @@ public class CamelPostProcessorHelper im
protected ProducerTemplate createInjectionProducerTemplate(String endpointUri, String endpointRef, String injectionPointName) {
// endpoint is optional for this injection point
Endpoint endpoint = getEndpointInjection(endpointUri, endpointRef, injectionPointName, false);
- return new DefaultProducerTemplate(getCamelContext(), endpoint);
+ ProducerTemplate answer = new DefaultProducerTemplate(getCamelContext(), endpoint);
+ // start the template so its ready to use
+ try {
+ answer.start();
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ return answer;
}
/**
* Factory method to create a {@link org.apache.camel.ConsumerTemplate} to be injected into a POJO
*/
protected ConsumerTemplate createInjectionConsumerTemplate(String endpointUri, String endpointRef, String injectionPointName) {
- return new DefaultConsumerTemplate(getCamelContext());
+ ConsumerTemplate answer = new DefaultConsumerTemplate(getCamelContext());
+ // start the template so its ready to use
+ try {
+ answer.start();
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ return answer;
}
/**
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java Wed Apr 7 06:31:41 2010
@@ -18,11 +18,13 @@ package org.apache.camel.impl;
import java.util.Map;
+import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.FailedToCreateConsumerException;
import org.apache.camel.IsSingleton;
import org.apache.camel.PollingConsumer;
+import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.LRUCache;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
@@ -35,14 +37,19 @@ import org.apache.commons.logging.LogFac
*/
public class ConsumerCache extends ServiceSupport {
private static final transient Log LOG = LogFactory.getLog(ConsumerCache.class);
-
+ private final CamelContext camelContext;
private final Map<String, PollingConsumer> consumers;
- public ConsumerCache() {
- this.consumers = new LRUCache<String, PollingConsumer>(1000);
+ public ConsumerCache(CamelContext camelContext) {
+ this(camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext));
+ }
+
+ public ConsumerCache(CamelContext camelContext, int maximumCacheSize) {
+ this(camelContext, new LRUCache<String, PollingConsumer>(maximumCacheSize));
}
- public ConsumerCache(Map<String, PollingConsumer> cache) {
+ public ConsumerCache(CamelContext camelContext, Map<String, PollingConsumer> cache) {
+ this.camelContext = camelContext;
this.consumers = cache;
}
@@ -59,7 +66,7 @@ public class ConsumerCache extends Servi
boolean singleton = true;
if (answer instanceof IsSingleton) {
- singleton = ((IsSingleton)answer).isSingleton();
+ singleton = ((IsSingleton) answer).isSingleton();
}
if (singleton) {
@@ -103,12 +110,13 @@ public class ConsumerCache extends Servi
return consumer.receiveNoWait();
}
- protected void doStop() throws Exception {
- ServiceHelper.stopServices(consumers.values());
- consumers.clear();
+ protected void doStart() throws Exception {
+ ServiceHelper.startServices(consumers);
}
- protected void doStart() throws Exception {
+ protected void doStop() throws Exception {
+ ServiceHelper.stopServices(consumers);
+ consumers.clear();
}
/**
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Wed Apr 7 06:31:41 2010
@@ -89,6 +89,7 @@ import org.apache.camel.spi.RouteStartup
import org.apache.camel.spi.ServicePool;
import org.apache.camel.spi.ShutdownStrategy;
import org.apache.camel.spi.TypeConverterRegistry;
+import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.EventHelper;
import org.apache.camel.util.LRUCache;
@@ -501,7 +502,7 @@ public class DefaultCamelContext extends
/**
* Strategy to add the given endpoint to the internal endpoint registry
*
- * @param uri uri of endpoint
+ * @param uri uri of endpoint
* @param endpoint the endpoint to add
* @return the added endpoint
*/
@@ -936,12 +937,30 @@ public class DefaultCamelContext extends
this.delay = delay;
}
- public ProducerTemplate createProducerTemplate() {
- return new DefaultProducerTemplate(this);
+ public ProducerTemplate createProducerTemplate() throws Exception {
+ int size = CamelContextHelper.getMaximumCachePoolSize(this);
+ return createProducerTemplate(size);
}
- public ConsumerTemplate createConsumerTemplate() {
- return new DefaultConsumerTemplate(this);
+ public ProducerTemplate createProducerTemplate(int maximumCacheSize) throws Exception {
+ DefaultProducerTemplate answer = new DefaultProducerTemplate(this);
+ answer.setMaximumCacheSize(maximumCacheSize);
+ // start it so its ready to use
+ answer.start();
+ return answer;
+ }
+
+ public ConsumerTemplate createConsumerTemplate() throws Exception {
+ int size = CamelContextHelper.getMaximumCachePoolSize(this);
+ return createConsumerTemplate(size);
+ }
+
+ public ConsumerTemplate createConsumerTemplate(int maximumCacheSize) throws Exception {
+ DefaultConsumerTemplate answer = new DefaultConsumerTemplate(this);
+ answer.setMaximumCacheSize(maximumCacheSize);
+ // start it so its ready to use
+ answer.start();
+ return answer;
}
public ErrorHandlerBuilder getErrorHandlerBuilder() {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java Wed Apr 7 06:31:41 2010
@@ -16,42 +16,50 @@
*/
package org.apache.camel.impl;
-
import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.ServiceHelper;
import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
/**
* @version $Revision$
*/
-public class DefaultConsumerTemplate implements ConsumerTemplate {
+public class DefaultConsumerTemplate extends ServiceSupport implements ConsumerTemplate {
private final CamelContext context;
- private final ConsumerCache consumerCache = new ConsumerCache();
+ private ConsumerCache consumerCache;
+ private int maximumCacheSize;
public DefaultConsumerTemplate(CamelContext context) {
this.context = context;
}
- public void start() throws Exception {
- consumerCache.start();
+ public int getMaximumCacheSize() {
+ return maximumCacheSize;
+ }
+
+ public void setMaximumCacheSize(int maximumCacheSize) {
+ this.maximumCacheSize = maximumCacheSize;
}
- public void stop() throws Exception {
- consumerCache.stop();
+ public int getCurrentCacheSize() {
+ if (consumerCache == null) {
+ return 0;
+ }
+ return consumerCache.size();
}
-
+
public CamelContext getCamelContext() {
return context;
}
public Exchange receive(String endpointUri) {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
- return consumerCache.receive(endpoint);
+ return getConsumerCache().receive(endpoint);
}
public Exchange receive(Endpoint endpoint) {
@@ -60,7 +68,7 @@ public class DefaultConsumerTemplate imp
public Exchange receive(String endpointUri, long timeout) {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
- return consumerCache.receive(endpoint, timeout);
+ return getConsumerCache().receive(endpoint, timeout);
}
public Exchange receive(Endpoint endpoint, long timeout) {
@@ -69,7 +77,7 @@ public class DefaultConsumerTemplate imp
public Exchange receiveNoWait(String endpointUri) {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
- return consumerCache.receiveNoWait(endpoint);
+ return getConsumerCache().receiveNoWait(endpoint);
}
public Exchange receiveNoWait(Endpoint endpoint) {
@@ -162,4 +170,28 @@ public class DefaultConsumerTemplate imp
}
return answer;
}
+
+ private ConsumerCache getConsumerCache() {
+ if (!isStarted()) {
+ throw new IllegalStateException("ConsumerTemplate has not been started");
+ }
+ return consumerCache;
+ }
+
+ protected void doStart() throws Exception {
+ if (consumerCache == null) {
+ if (maximumCacheSize > 0) {
+ consumerCache = new ConsumerCache(context, maximumCacheSize);
+ } else {
+ consumerCache = new ConsumerCache(context);
+ }
+ }
+ ServiceHelper.startService(consumerCache);
+ }
+
+ protected void doStop() throws Exception {
+ ServiceHelper.stopService(consumerCache);
+ consumerCache = null;
+ }
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerServicePool.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerServicePool.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerServicePool.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerServicePool.java Wed Apr 7 06:31:41 2010
@@ -16,8 +16,6 @@
*/
package org.apache.camel.impl;
-import java.util.concurrent.BlockingQueue;
-
import org.apache.camel.Endpoint;
import org.apache.camel.Producer;
@@ -35,12 +33,4 @@ public class DefaultProducerServicePool
super(capacity);
}
- public synchronized int size() {
- int size = 0;
- for (BlockingQueue<Producer> queue : pool.values()) {
- size += queue.size();
- }
- return size;
- }
-
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Wed Apr 7 06:31:41 2010
@@ -38,26 +38,21 @@ import org.apache.camel.util.ObjectHelpe
import org.apache.camel.util.ServiceHelper;
/**
- * A client helper object (named like Spring's TransactionTemplate & JmsTemplate
- * et al) for working with Camel and sending {@link org.apache.camel.Message} instances in an
- * {@link org.apache.camel.Exchange} to an {@link org.apache.camel.Endpoint}.
- *
* @version $Revision$
*/
public class DefaultProducerTemplate extends ServiceSupport implements ProducerTemplate {
private final CamelContext context;
- private final ProducerCache producerCache;
+ private ProducerCache producerCache;
private Endpoint defaultEndpoint;
private ExecutorService executor;
+ private int maximumCacheSize;
public DefaultProducerTemplate(CamelContext context) {
this.context = context;
- this.producerCache = new ProducerCache(context);
}
public DefaultProducerTemplate(CamelContext context, ExecutorService executor) {
this.context = context;
- this.producerCache = new ProducerCache(context);
this.executor = executor;
}
@@ -71,6 +66,21 @@ public class DefaultProducerTemplate ext
return new DefaultProducerTemplate(camelContext, endpoint);
}
+ public int getMaximumCacheSize() {
+ return maximumCacheSize;
+ }
+
+ public void setMaximumCacheSize(int maximumCacheSize) {
+ this.maximumCacheSize = maximumCacheSize;
+ }
+
+ public int getCurrentCacheSize() {
+ if (producerCache == null) {
+ return 0;
+ }
+ return producerCache.size();
+ }
+
public Exchange send(String endpointUri, Exchange exchange) {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
return send(endpoint, exchange);
@@ -87,16 +97,16 @@ public class DefaultProducerTemplate ext
}
public Exchange send(Endpoint endpoint, Exchange exchange) {
- producerCache.send(endpoint, exchange);
+ getProducerCache().send(endpoint, exchange);
return exchange;
}
public Exchange send(Endpoint endpoint, Processor processor) {
- return producerCache.send(endpoint, processor);
+ return getProducerCache().send(endpoint, processor);
}
public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
- return producerCache.send(endpoint, pattern, processor);
+ return getProducerCache().send(endpoint, pattern, processor);
}
public Object sendBody(Endpoint endpoint, ExchangePattern pattern, Object body) {
@@ -680,7 +690,7 @@ public class DefaultProducerTemplate ext
public Future<Exchange> asyncCallback(final Endpoint endpoint, final Processor processor, final Synchronization onCompletion) {
Callable<Exchange> task = new Callable<Exchange>() {
public Exchange call() throws Exception {
- Exchange answer = producerCache.send(endpoint, processor);
+ Exchange answer = getProducerCache().send(endpoint, processor);
// invoke callback before returning answer
// as it allows callback to be used without UnitOfWorkProcessor invoking it
@@ -701,7 +711,21 @@ public class DefaultProducerTemplate ext
return executor.submit(task);
}
+ private ProducerCache getProducerCache() {
+ if (!isStarted()) {
+ throw new IllegalStateException("ProducerTemplate has not been started");
+ }
+ return producerCache;
+ }
+
protected void doStart() throws Exception {
+ if (producerCache == null) {
+ if (maximumCacheSize > 0) {
+ producerCache = new ProducerCache(context, maximumCacheSize);
+ } else {
+ producerCache = new ProducerCache(context);
+ }
+ }
ServiceHelper.startService(producerCache);
if (executor == null) {
executor = context.getExecutorServiceStrategy().newDefaultThreadPool(this, "ProducerTemplate");
@@ -710,6 +734,8 @@ public class DefaultProducerTemplate ext
protected void doStop() throws Exception {
ServiceHelper.stopService(producerCache);
+ producerCache = null;
+
if (executor != null) {
context.getExecutorServiceStrategy().shutdownNow(executor);
executor = null;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java Wed Apr 7 06:31:41 2010
@@ -46,6 +46,14 @@ public abstract class DefaultServicePool
this.capacity = capacity;
}
+ public synchronized int size() {
+ int size = 0;
+ for (BlockingQueue<Service> entry : pool.values()) {
+ size += entry.size();
+ }
+ return size;
+ }
+
public synchronized Service addAndAcquire(Key key, Service service) {
BlockingQueue<Service> entry = pool.get(key);
if (entry == null) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java Wed Apr 7 06:31:41 2010
@@ -323,14 +323,14 @@ public abstract class MainSupport extend
* Returns a {@link org.apache.camel.ProducerTemplate} from the Spring {@link org.springframework.context.ApplicationContext} instances
* or lazily creates a new one dynamically
*/
- public ProducerTemplate getCamelTemplate() {
+ public ProducerTemplate getCamelTemplate() throws Exception {
if (camelTemplate == null) {
camelTemplate = findOrCreateCamelTemplate();
}
return camelTemplate;
}
- protected abstract ProducerTemplate findOrCreateCamelTemplate();
+ protected abstract ProducerTemplate findOrCreateCamelTemplate() throws Exception;
protected abstract Map<String, CamelContext> getCamelContextMap();
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Wed Apr 7 06:31:41 2010
@@ -28,11 +28,13 @@ import org.apache.camel.Producer;
import org.apache.camel.ProducerCallback;
import org.apache.camel.ServicePoolAware;
import org.apache.camel.spi.ServicePool;
+import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.EventHelper;
import org.apache.camel.util.LRUCache;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
/**
@@ -43,22 +45,24 @@ import static org.apache.camel.util.Obje
public class ProducerCache extends ServiceSupport {
private static final transient Log LOG = LogFactory.getLog(ProducerCache.class);
- private final Map<String, Producer> producers;
- private final ServicePool<Endpoint, Producer> pool;
- private final CamelContext context;
+ // TODO: Expose this cache for management in JMX (also ConsumerCache)
+ // TODO: Add source information so we know who uses this cache
+ // TODO: Add purge operation to purge the cache
- // TODO: Have easy configuration of pooling in Camel
+ private final CamelContext camelContext;
+ private final ServicePool<Endpoint, Producer> pool;
+ private final Map<String, Producer> producers;
- public ProducerCache(CamelContext context) {
- this(context, context.getProducerServicePool());
+ public ProducerCache(CamelContext camelContext) {
+ this(camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext));
}
- public ProducerCache(CamelContext context, ServicePool<Endpoint, Producer> producerServicePool) {
- this(context, producerServicePool, new LRUCache<String, Producer>(1000));
+ public ProducerCache(CamelContext camelContext, int cacheSize) {
+ this(camelContext, camelContext.getProducerServicePool(), new LRUCache<String, Producer>(cacheSize));
}
- public ProducerCache(CamelContext context, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) {
- this.context = context;
+ public ProducerCache(CamelContext camelContext, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) {
+ this.camelContext = camelContext;
this.pool = producerServicePool;
this.producers = cache;
}
@@ -258,23 +262,23 @@ public class ProducerCache extends Servi
}
protected void doStop() throws Exception {
- ServiceHelper.stopServices(producers);
+ ServiceHelper.stopServices(producers, pool);
producers.clear();
-
- ServiceHelper.stopServices(pool);
}
protected void doStart() throws Exception {
- ServiceHelper.startServices(pool);
+ ServiceHelper.startServices(pool, producers);
}
/**
- * Returns the current size of the producer cache
+ * Returns the current size of the cache
*
* @return the current size
*/
int size() {
- return producers.size();
+ int size = producers.size();
+ size += pool.size();
+ return size;
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java Wed Apr 7 06:31:41 2010
@@ -135,15 +135,22 @@ public class ManagedCamelContext {
@ManagedOperation(description = "Send body (in only)")
public void sendBody(String endpointUri, String body) throws Exception {
ProducerTemplate template = context.createProducerTemplate();
- template.sendBody(endpointUri, body);
- template.stop();
+ try {
+ template.sendBody(endpointUri, body);
+ } finally {
+ template.stop();
+ }
}
@ManagedOperation(description = "Request body (in out)")
public Object requestBody(String endpointUri, String body) throws Exception {
ProducerTemplate template = context.createProducerTemplate();
- Object answer = template.requestBody(endpointUri, body);
- template.stop();
+ Object answer = null;
+ try {
+ answer = template.requestBody(endpointUri, body);
+ } finally {
+ template.stop();
+ }
return answer;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Wed Apr 7 06:31:41 2010
@@ -85,9 +85,9 @@ public class RecipientListDefinition<Typ
RecipientList answer;
if (delimiter != null) {
- answer = new RecipientList(expression, delimiter);
+ answer = new RecipientList(routeContext.getCamelContext(), expression, delimiter);
} else {
- answer = new RecipientList(expression);
+ answer = new RecipientList(routeContext.getCamelContext(), expression);
}
answer.setAggregationStrategy(createAggregationStrategy(routeContext));
answer.setParallelProcessing(isParallelProcessing());
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java Wed Apr 7 06:31:41 2010
@@ -69,7 +69,7 @@ public class RoutingSlipDefinition exten
public Processor createProcessor(RouteContext routeContext) throws Exception {
ObjectHelper.notEmpty(getHeaderName(), "headerName", this);
ObjectHelper.notEmpty(getUriDelimiter(), "uriDelimiter", this);
- return new RoutingSlip(getHeaderName(), getUriDelimiter());
+ return new RoutingSlip(routeContext.getCamelContext(), getHeaderName(), getUriDelimiter());
}
@Override
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Wed Apr 7 06:31:41 2010
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
@@ -34,6 +35,9 @@ import org.apache.camel.processor.aggreg
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+
+import static org.apache.camel.util.ObjectHelper.notNull;
/**
* Implements a dynamic <a
@@ -44,6 +48,7 @@ import org.apache.camel.util.ObjectHelpe
* @version $Revision$
*/
public class RecipientList extends ServiceSupport implements Processor {
+ private final CamelContext camelContext;
private ProducerCache producerCache;
private Expression expression;
private final String delimiter;
@@ -52,23 +57,28 @@ public class RecipientList extends Servi
private ExecutorService executorService;
private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy();
- public RecipientList() {
+ public RecipientList(CamelContext camelContext) {
// use comma by default as delimiter
- this.delimiter = ",";
+ this(camelContext, ",");
}
- public RecipientList(String delimiter) {
+ public RecipientList(CamelContext camelContext, String delimiter) {
+ notNull(camelContext, "camelContext");
+ ObjectHelper.notEmpty(delimiter, "delimiter");
+ this.camelContext = camelContext;
this.delimiter = delimiter;
}
- public RecipientList(Expression expression) {
+ public RecipientList(CamelContext camelContext, Expression expression) {
// use comma by default as delimiter
- this(expression, ",");
+ this(camelContext, expression, ",");
}
- public RecipientList(Expression expression, String delimiter) {
+ public RecipientList(CamelContext camelContext, Expression expression, String delimiter) {
+ notNull(camelContext, "camelContext");
ObjectHelper.notNull(expression, "expression");
ObjectHelper.notEmpty(delimiter, "delimiter");
+ this.camelContext = camelContext;
this.expression = expression;
this.delimiter = delimiter;
}
@@ -79,6 +89,10 @@ public class RecipientList extends Servi
}
public void process(Exchange exchange) throws Exception {
+ if (!isStarted()) {
+ throw new IllegalStateException("RecipientList has not been started: " + this);
+ }
+
Object receipientList = expression.evaluate(exchange, Object.class);
sendToRecipientList(exchange, receipientList);
}
@@ -91,7 +105,6 @@ public class RecipientList extends Servi
// we should acquire and release the producers we need so we can leverage the producer
// cache to the fullest
- ProducerCache cache = getProducerCache(exchange);
Map<Endpoint, Producer> producers = new LinkedHashMap<Endpoint, Producer>();
try {
List<Processor> processors = new ArrayList<Processor>();
@@ -99,7 +112,7 @@ public class RecipientList extends Servi
Object recipient = iter.next();
Endpoint endpoint = resolveEndpoint(exchange, recipient);
// acquire producer which we then release later
- Producer producer = cache.acquireProducer(endpoint);
+ Producer producer = producerCache.acquireProducer(endpoint);
processors.add(producer);
producers.put(endpoint, producer);
}
@@ -112,20 +125,11 @@ public class RecipientList extends Servi
} finally {
// and release the producers back to the producer cache
for (Map.Entry<Endpoint, Producer> entry : producers.entrySet()) {
- cache.releaseProducer(entry.getKey(), entry.getValue());
+ producerCache.releaseProducer(entry.getKey(), entry.getValue());
}
}
}
- protected ProducerCache getProducerCache(Exchange exchange) throws Exception {
- // setup producer cache as we need to use the pluggable service pool defined on camel context
- if (producerCache == null) {
- this.producerCache = new ProducerCache(exchange.getContext());
- this.producerCache.start();
- }
- return this.producerCache;
- }
-
protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
// trim strings as end users might have added spaces between separators
if (recipient instanceof String) {
@@ -135,15 +139,16 @@ public class RecipientList extends Servi
}
protected void doStart() throws Exception {
- if (producerCache != null) {
- producerCache.start();
+ if (producerCache == null) {
+ producerCache = new ProducerCache(camelContext);
+ // add it as a service so we can manage it
+ camelContext.addService(producerCache);
}
+ ServiceHelper.startService(producerCache);
}
protected void doStop() throws Exception {
- if (producerCache != null) {
- producerCache.stop();
- }
+ ServiceHelper.stopService(producerCache);
}
public boolean isParallelProcessing() {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java Wed Apr 7 06:31:41 2010
@@ -16,6 +16,7 @@
*/
package org.apache.camel.processor;
+import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
@@ -28,6 +29,7 @@ import org.apache.camel.impl.ProducerCac
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.model.RoutingSlipDefinition;
import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -43,15 +45,18 @@ public class RoutingSlip extends Service
private ProducerCache producerCache;
private final String header;
private final String uriDelimiter;
+ private final CamelContext camelContext;
- public RoutingSlip(String header) {
- this(header, RoutingSlipDefinition.DEFAULT_DELIMITER);
+ public RoutingSlip(CamelContext camelContext, String header) {
+ this(camelContext, header, RoutingSlipDefinition.DEFAULT_DELIMITER);
}
- public RoutingSlip(String header, String uriDelimiter) {
+ public RoutingSlip(CamelContext camelContext, String header, String uriDelimiter) {
+ notNull(camelContext, "camelContext");
notNull(header, "header");
notNull(uriDelimiter, "uriDelimiter");
+ this.camelContext = camelContext;
this.header = header;
this.uriDelimiter = uriDelimiter;
}
@@ -66,6 +71,10 @@ public class RoutingSlip extends Service
}
public void process(Exchange exchange) throws Exception {
+ if (!isStarted()) {
+ throw new IllegalStateException("RoutingSlip has not been started: " + this);
+ }
+
Message message = exchange.getIn();
String[] recipients = recipients(message);
Exchange current = exchange;
@@ -78,7 +87,7 @@ public class RoutingSlip extends Service
copyOutToIn(copy, current);
try {
- getProducerCache(exchange).doInProducer(endpoint, copy, null, new ProducerCallback<Object>() {
+ producerCache.doInProducer(endpoint, copy, null, new ProducerCallback<Object>() {
public Object doInProducer(Producer producer, Exchange exchange, ExchangePattern exchangePattern) throws Exception {
// set property which endpoint we send to
exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
@@ -125,29 +134,21 @@ public class RoutingSlip extends Service
return Boolean.TRUE.equals(nextExchange.getProperty(Exchange.ERRORHANDLER_HANDLED));
}
- protected ProducerCache getProducerCache(Exchange exchange) throws Exception {
- // setup producer cache as we need to use the pluggable service pool defined on camel context
- if (producerCache == null) {
- this.producerCache = new ProducerCache(exchange.getContext());
- this.producerCache.start();
- }
- return this.producerCache;
- }
-
protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
return ExchangeHelper.resolveEndpoint(exchange, recipient);
}
protected void doStart() throws Exception {
- if (producerCache != null) {
- producerCache.start();
+ if (producerCache == null) {
+ producerCache = new ProducerCache(camelContext);
+ // add it as a service so we can manage it
+ camelContext.addService(producerCache);
}
+ ServiceHelper.startService(producerCache);
}
protected void doStop() throws Exception {
- if (producerCache != null) {
- producerCache.stop();
- }
+ ServiceHelper.stopService(producerCache);
}
private void updateRoutingSlip(Exchange current) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java Wed Apr 7 06:31:41 2010
@@ -85,9 +85,8 @@ public class SendAsyncProcessor extends
public Exchange doProcess(Exchange exchange) throws Exception {
// now we are done, we should have a API callback for this
// send the exchange to the destination using a producer
- final ProducerCache cache = getProducerCache(exchange);
// acquire the producer from the service pool
- final Producer producer = cache.acquireProducer(destination);
+ final Producer producer = producerCache.acquireProducer(destination);
ObjectHelper.notNull(producer, "producer");
// pass in the callback that adds the exchange to the completed list of tasks
@@ -101,7 +100,7 @@ public class SendAsyncProcessor extends
} finally {
// must return the producer to service pool when we are done
try {
- cache.releaseProducer(destination, producer);
+ producerCache.releaseProducer(destination, producer);
} catch (Exception e) {
LOG.warn("Error releasing producer: " + producer + ". This exception will be ignored.", e);
}
@@ -282,7 +281,7 @@ public class SendAsyncProcessor extends
super.doStart();
if (poolSize <= 0) {
- throw new IllegalArgumentException("PoolSize must be a positive number");
+ throw new IllegalArgumentException("PoolSize must be a positive number, was: " + poolSize);
}
for (int i = 0; i < poolSize; i++) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java Wed Apr 7 06:31:41 2010
@@ -16,6 +16,7 @@
*/
package org.apache.camel.processor;
+import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
@@ -26,6 +27,7 @@ import org.apache.camel.impl.InterceptSe
import org.apache.camel.impl.ProducerCache;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,6 +38,7 @@ import org.apache.commons.logging.LogFac
*/
public class SendProcessor extends ServiceSupport implements Processor, Traceable {
protected static final transient Log LOG = LogFactory.getLog(SendProcessor.class);
+ protected final CamelContext camelContext;
protected ProducerCache producerCache;
protected Endpoint destination;
protected ExchangePattern pattern;
@@ -44,6 +47,8 @@ public class SendProcessor extends Servi
public SendProcessor(Endpoint destination) {
ObjectHelper.notNull(destination, "destination");
this.destination = destination;
+ this.camelContext = destination.getCamelContext();
+ ObjectHelper.notNull(this.camelContext, "camelContext");
}
public SendProcessor(Endpoint destination, ExchangePattern pattern) {
@@ -90,8 +95,12 @@ public class SendProcessor extends Servi
* @return the exchange that was processed
*/
public Exchange doProcess(final Exchange exchange) throws Exception {
+ if (!isStarted()) {
+ throw new IllegalStateException("SendProcessor has not been started: " + this);
+ }
+
// send the exchange to the destination using a producer
- return getProducerCache(exchange).doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() {
+ return producerCache.doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() {
public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception {
exchange = configureExchange(exchange, pattern);
producer.process(exchange);
@@ -100,15 +109,6 @@ public class SendProcessor extends Servi
});
}
- protected ProducerCache getProducerCache(Exchange exchange) throws Exception {
- // setup producer cache as we need to use the pluggable service pool defined on camel context
- if (producerCache == null) {
- producerCache = new ProducerCache(exchange.getContext());
- producerCache.start();
- }
- return producerCache;
- }
-
public Endpoint getDestination() {
return destination;
}
@@ -127,15 +127,16 @@ public class SendProcessor extends Servi
}
protected void doStart() throws Exception {
- if (producerCache != null) {
- producerCache.start();
+ if (producerCache == null) {
+ producerCache = new ProducerCache(camelContext);
+ // add it as a service so we can manage it
+ camelContext.addService(producerCache);
}
+ ServiceHelper.startService(producerCache);
}
protected void doStop() throws Exception {
- if (producerCache != null) {
- producerCache.stop();
- }
+ ServiceHelper.stopService(producerCache);
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Wed Apr 7 06:31:41 2010
@@ -19,7 +19,6 @@ package org.apache.camel.processor;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
-import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
@@ -37,8 +36,6 @@ import org.apache.camel.util.ObjectHelpe
* @version $Revision$
*/
public class WireTapProcessor extends SendProcessor {
-
- private final CamelContext camelContext;
private final ExecutorService executorService;
// expression or processor used for populating a new exchange to send
@@ -50,14 +47,12 @@ public class WireTapProcessor extends Se
super(destination);
ObjectHelper.notNull(executorService, "executorService");
this.executorService = executorService;
- this.camelContext = destination.getCamelContext();
}
public WireTapProcessor(Endpoint destination, ExchangePattern pattern, ExecutorService executorService) {
super(destination, pattern);
ObjectHelper.notNull(executorService, "executorService");
this.executorService = executorService;
- this.camelContext = destination.getCamelContext();
}
@Override
@@ -71,7 +66,7 @@ public class WireTapProcessor extends Se
}
public void process(Exchange exchange) throws Exception {
- getProducerCache(exchange).doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() {
+ producerCache.doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() {
public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception {
Exchange wireTapExchange = configureExchange(exchange, pattern);
processWireTap(producer, wireTapExchange);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Wed Apr 7 06:31:41 2010
@@ -598,7 +598,7 @@ public class AggregateProcessor extends
}
- ServiceHelper.startService(aggregationRepository);
+ ServiceHelper.startServices(processor, aggregationRepository);
// should we use recover checker
if (aggregationRepository instanceof RecoverableAggregationRepository) {
@@ -629,7 +629,7 @@ public class AggregateProcessor extends
@Override
protected void doStop() throws Exception {
- ServiceHelper.stopServices(timeoutMap, recoverService, aggregationRepository);
+ ServiceHelper.stopServices(timeoutMap, recoverService, aggregationRepository, processor);
if (closedCorrelationKeys != null) {
closedCorrelationKeys.clear();
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ServicePool.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ServicePool.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ServicePool.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ServicePool.java Wed Apr 7 06:31:41 2010
@@ -21,15 +21,18 @@ package org.apache.camel.spi;
* <p/>
* Services that is capable of being pooled should implement the marker interface
* {@link org.apache.camel.ServicePoolAware}.
+ * <p/>
+ * Notice the capacity is <b>per key</b> which means that each key can contain at most
+ * (the capacity) services. The pool can contain an unbounded number of keys.
*
* @version $Revision$
*/
-public interface ServicePool<Key, Service> extends org.apache.camel.Service {
+public interface ServicePool<Key, Service> {
/**
* Adds the given service to the pool and acquires it.
*
- * @param key the key
+ * @param key the key
* @param service the service
* @return the acquired service, is newer <tt>null</tt>
*/
@@ -37,7 +40,7 @@ public interface ServicePool<Key, Servic
/**
* Tries to acquire the service with the given key
- *
+ *
* @param key the key
* @return the acquired service, or <tt>null</tt> if no free in pool
*/
@@ -46,9 +49,16 @@ public interface ServicePool<Key, Servic
/**
* Releases the service back to the pool
*
- * @param key the key
+ * @param key the key
* @param service the service
*/
void release(Key key, Service service);
+ /**
+ * Returns the current size of the pool
+ *
+ * @return the current size of the pool
+ */
+ int size();
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/CamelContextHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/CamelContextHelper.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/CamelContextHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/CamelContextHelper.java Wed Apr 7 06:31:41 2010
@@ -18,6 +18,7 @@ package org.apache.camel.util;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
import org.apache.camel.NoSuchEndpointException;
import static org.apache.camel.util.ObjectHelper.isEmpty;
@@ -154,4 +155,28 @@ public final class CamelContextHelper {
return endpoint;
}
+ /**
+ * Gets the maximum cache pool size.
+ * <p/>
+ * Will use the property set on CamelContext with the key {@link Exchange#MAXIMUM_CACHE_POOL_SIZE}.
+ * If no property has been set, then it will fallback to return a size of 1000.
+ *
+ * @param camelContext the camel context
+ * @return the maximum cache size
+ * @throws IllegalArgumentException is thrown if the property is illegal
+ */
+ public static int getMaximumCachePoolSize(CamelContext camelContext) throws IllegalArgumentException {
+ String s = camelContext.getProperties().get(Exchange.MAXIMUM_CACHE_POOL_SIZE);
+ if (s != null) {
+ Integer size = camelContext.getTypeConverter().convertTo(Integer.class, s);
+ if (size == null || size <= 0) {
+ throw new IllegalArgumentException("Property " + Exchange.MAXIMUM_CACHE_POOL_SIZE + " must be a positive number, was: " + s);
+ }
+ return size;
+ }
+
+ // 1000 is the default fallback
+ return 1000;
+ }
+
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java Wed Apr 7 06:31:41 2010
@@ -97,6 +97,8 @@ public class SedaConcurrentTest extends
// use our own template that has a higher thread pool than default camel that uses 5
ProducerTemplate pt = new DefaultProducerTemplate(context, Executors.newFixedThreadPool(10));
+ // must start the template
+ pt.start();
List<Future> replies = new ArrayList<Future>(20);
for (int i = 0; i < 20; i++) {
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java Wed Apr 7 06:31:41 2010
@@ -30,7 +30,7 @@ import org.apache.camel.util.ServiceHelp
public class VmRouteTest extends TestSupport {
private CamelContext context1 = new DefaultCamelContext();
private CamelContext context2 = new DefaultCamelContext();
- private ProducerTemplate template = context1.createProducerTemplate();
+ private ProducerTemplate template;
private Object expectedBody = "<hello>world!</hello>";
public void testSedaQueue() throws Exception {
@@ -59,11 +59,13 @@ public class VmRouteTest extends TestSup
});
ServiceHelper.startServices(context1, context2);
+
+ template = context1.createProducerTemplate();
}
@Override
protected void tearDown() throws Exception {
- ServiceHelper.stopServices(context2, context1);
+ ServiceHelper.stopServices(context2, context1, template);
super.tearDown();
}
}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomProducerServicePoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomProducerServicePoolTest.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomProducerServicePoolTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomProducerServicePoolTest.java Wed Apr 7 06:31:41 2010
@@ -16,6 +16,7 @@
*/
package org.apache.camel.impl;
+import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Endpoint;
@@ -38,6 +39,10 @@ public class CustomProducerServicePoolTe
private class MyEndpoint extends DefaultEndpoint {
+ private MyEndpoint(String endpointUri, CamelContext camelContext) {
+ super(endpointUri, camelContext);
+ }
+
public Producer createProducer() throws Exception {
return new MyProducer(this);
}
@@ -71,14 +76,6 @@ public class CustomProducerServicePoolTe
private Producer producer;
- public void setCapacity(int capacity) {
- // noop
- }
-
- public int getCapacity() {
- return 0;
- }
-
public Producer addAndAcquire(Endpoint endpoint, Producer producer) {
if (endpoint instanceof MyEndpoint) {
return producer;
@@ -122,7 +119,7 @@ public class CustomProducerServicePoolTe
pool.start();
context.setProducerServicePool(pool);
- context.addEndpoint("my", new MyEndpoint());
+ context.addEndpoint("my", new MyEndpoint("my", context));
Endpoint endpoint = context.getEndpoint("my");
@@ -147,7 +144,7 @@ public class CustomProducerServicePoolTe
}
public void testCustomProducerServicePoolInRoute() throws Exception {
- context.addEndpoint("my", new MyEndpoint());
+ context.addEndpoint("my", new MyEndpoint("my", context));
MyPool pool = new MyPool();
pool.start();
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerCacheTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerCacheTest.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerCacheTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerCacheTest.java Wed Apr 7 06:31:41 2010
@@ -26,12 +26,12 @@ import org.apache.camel.PollingConsumer;
public class DefaultConsumerCacheTest extends ContextTestSupport {
public void testCacheConsumers() throws Exception {
- ConsumerCache cache = new ConsumerCache();
+ ConsumerCache cache = new ConsumerCache(context);
cache.start();
assertEquals("Size should be 0", 0, cache.size());
- // test that we cache at most 1000 producers to avoid it eating to much memory
+ // test that we cache at most 1000 consumers to avoid it eating to much memory
for (int i = 0; i < 1003; i++) {
Endpoint e = context.getEndpoint("direct:queue:" + i);
PollingConsumer p = cache.getConsumer(e);
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateTest.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateTest.java Wed Apr 7 06:31:41 2010
@@ -16,6 +16,7 @@
*/
package org.apache.camel.impl;
+import org.apache.camel.ConsumerTemplate;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -291,4 +292,42 @@ public class DefaultConsumerTemplateTest
assertEquals("Bye World", out);
}
+ public void testCacheConsumers() throws Exception {
+ ConsumerTemplate template = new DefaultConsumerTemplate(context);
+ template.setMaximumCacheSize(500);
+ template.start();
+
+ assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+
+ // test that we cache at most 500 consumers to avoid it eating to much memory
+ for (int i = 0; i < 503; i++) {
+ Endpoint e = context.getEndpoint("direct:queue:" + i);
+ Exchange ex = template.receiveNoWait(e);
+ }
+
+ assertEquals("Size should be 500", 500, template.getCurrentCacheSize());
+ template.stop();
+
+ // should be 0
+ assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+ }
+
+ public void testCacheConsumersFromContext() throws Exception {
+ ConsumerTemplate template = context.createConsumerTemplate(500);
+
+ assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+
+ // test that we cache at most 500 consumers to avoid it eating to much memory
+ for (int i = 0; i < 503; i++) {
+ Endpoint e = context.getEndpoint("direct:queue:" + i);
+ Exchange ex = template.receiveNoWait(e);
+ }
+
+ assertEquals("Size should be 500", 500, template.getCurrentCacheSize());
+ template.stop();
+
+ // should be 0
+ assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+ }
+
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateWithCustomCacheMaxSizeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateWithCustomCacheMaxSizeTest.java?rev=931444&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateWithCustomCacheMaxSizeTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateWithCustomCacheMaxSizeTest.java Wed Apr 7 06:31:41 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ConsumerTemplate;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+
+/**
+ * @version $Revision$
+ */
+public class DefaultConsumerTemplateWithCustomCacheMaxSizeTest extends ContextTestSupport {
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+ context.getProperties().put(Exchange.MAXIMUM_CACHE_POOL_SIZE, "200");
+ return context;
+ }
+
+ public void testCacheConsumers() throws Exception {
+ ConsumerTemplate template = context.createConsumerTemplate();
+
+ assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+
+ // test that we cache at most 500 producers to avoid it eating to much memory
+ for (int i = 0; i < 203; i++) {
+ Endpoint e = context.getEndpoint("direct:queue:" + i);
+ template.receiveNoWait(e);
+ }
+
+ assertEquals("Size should be 200", 200, template.getCurrentCacheSize());
+ template.stop();
+
+ // should be 0
+ assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+ }
+
+ public void testInvalidSizeABC() {
+ context.getProperties().put(Exchange.MAXIMUM_CACHE_POOL_SIZE, "ABC");
+ try {
+ context.createConsumerTemplate();
+ fail("Should have thrown an exception");
+ } catch (Exception e) {
+ assertEquals("Property CamelMaximumCachePoolSize must be a positive number, was: ABC", e.getMessage());
+ }
+ }
+
+ public void testInvalidSizeZero() {
+ context.getProperties().put(Exchange.MAXIMUM_CACHE_POOL_SIZE, "0");
+ try {
+ context.createConsumerTemplate();
+ fail("Should have thrown an exception");
+ } catch (Exception e) {
+ assertEquals("Property CamelMaximumCachePoolSize must be a positive number, was: 0", e.getMessage());
+ }
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateWithCustomCacheMaxSizeTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerTemplateWithCustomCacheMaxSizeTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateTest.java?rev=931444&r1=931443&r2=931444&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateTest.java Wed Apr 7 06:31:41 2010
@@ -108,6 +108,7 @@ public class DefaultProducerTemplateTest
public void testRequestUsingDefaultEndpoint() throws Exception {
ProducerTemplate producer = new DefaultProducerTemplate(context, context.getEndpoint("direct:out"));
+ producer.start();
Object out = producer.requestBody("Hello");
assertEquals("Bye Bye World", out);
@@ -118,10 +119,13 @@ public class DefaultProducerTemplateTest
Map<String, Object> headers = new HashMap<String, Object>();
out = producer.requestBodyAndHeaders("Hello", headers);
assertEquals("Bye Bye World", out);
+
+ producer.stop();
}
public void testSendUsingDefaultEndpoint() throws Exception {
ProducerTemplate producer = new DefaultProducerTemplate(context, context.getEndpoint("direct:in"));
+ producer.start();
getMockEndpoint("mock:result").expectedMessageCount(3);
@@ -131,6 +135,8 @@ public class DefaultProducerTemplateTest
producer.sendBodyAndHeaders("Hello", headers);
assertMockEndpointsSatisfied();
+
+ producer.stop();
}
@Override
@@ -169,4 +175,43 @@ public class DefaultProducerTemplateTest
}
};
}
+
+ public void testCacheProducers() throws Exception {
+ ProducerTemplate template = new DefaultProducerTemplate(context);
+ template.setMaximumCacheSize(500);
+ template.start();
+
+ assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+
+ // test that we cache at most 500 producers to avoid it eating to much memory
+ for (int i = 0; i < 503; i++) {
+ Endpoint e = context.getEndpoint("direct:queue:" + i);
+ template.sendBody(e, "Hello");
+ }
+
+ assertEquals("Size should be 500", 500, template.getCurrentCacheSize());
+ template.stop();
+
+ // should be 0
+ assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+ }
+
+ public void testCacheProducersFromContext() throws Exception {
+ ProducerTemplate template = context.createProducerTemplate(500);
+
+ assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+
+ // test that we cache at most 500 producers to avoid it eating to much memory
+ for (int i = 0; i < 503; i++) {
+ Endpoint e = context.getEndpoint("direct:queue:" + i);
+ template.sendBody(e, "Hello");
+ }
+
+ assertEquals("Size should be 500", 500, template.getCurrentCacheSize());
+ template.stop();
+
+ // should be 0
+ assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
+ }
+
}