You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/17 11:54:58 UTC
svn commit: r924220 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java
main/java/org/apache/camel/processor/MulticastProcessor.java
test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
Author: davsclaus
Date: Wed Mar 17 10:54:57 2010
New Revision: 924220
URL: http://svn.apache.org/viewvc?rev=924220&view=rev
Log:
CAMEL-1588: Using default work queue capacity of 1000 to avoid queing too many tasks if consumers cannot keep up.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java?rev=924220&r1=924219&r2=924220&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java Wed Mar 17 10:54:57 2010
@@ -32,7 +32,7 @@ public class ThreadPoolProfileSupport im
private Integer maxPoolSize = 20;
private Long keepAliveTime = 60L;
private TimeUnit timeUnit = TimeUnit.SECONDS;
- private Integer maxQueueSize = -1;
+ private Integer maxQueueSize = 1000;
private ThreadPoolRejectedPolicy rejectedPolicy;
public Boolean isDefaultProfile() {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=924220&r1=924219&r2=924220&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed Mar 17 10:54:57 2010
@@ -101,7 +101,7 @@ public class MulticastProcessor extends
private final CamelContext camelContext;
private Collection<Processor> processors;
private final AggregationStrategy aggregationStrategy;
- private final boolean isParallelProcessing;
+ private final boolean parallelProcessing;
private final boolean streaming;
private final boolean stopOnException;
private final ExecutorService executorService;
@@ -121,10 +121,11 @@ public class MulticastProcessor extends
this.camelContext = camelContext;
this.processors = processors;
this.aggregationStrategy = aggregationStrategy;
- this.isParallelProcessing = parallelProcessing;
this.executorService = executorService;
this.streaming = streaming;
this.stopOnException = stopOnException;
+ // must enable parallel if executor service is provided
+ this.parallelProcessing = parallelProcessing || executorService != null;
}
@Override
@@ -403,7 +404,7 @@ public class MulticastProcessor extends
}
public boolean isParallelProcessing() {
- return isParallelProcessing;
+ return parallelProcessing;
}
public List<Processor> next() {
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java?rev=924220&r1=924219&r2=924220&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java Wed Mar 17 10:54:57 2010
@@ -82,6 +82,27 @@ public class DefaultExecutorServiceStrat
assertEquals(10, executor.getCorePoolSize());
assertEquals(20, executor.getMaximumPoolSize());
assertEquals(60, executor.getKeepAliveTime(TimeUnit.SECONDS));
+ assertEquals(1000, executor.getQueue().remainingCapacity());
+
+ context.stop();
+ assertEquals(true, myPool.isShutdown());
+ }
+
+ public void testDefaultUnboundedQueueThreadPool() throws Exception {
+ ThreadPoolProfileSupport custom = new ThreadPoolProfileSupport();
+ custom.setMaxQueueSize(-1);
+
+ context.getExecutorServiceStrategy().setDefaultThreadPoolProfile(custom);
+ assertEquals(true, custom.isDefaultProfile().booleanValue());
+
+ ExecutorService myPool = context.getExecutorServiceStrategy().newDefaultThreadPool(this, "myPool");
+ assertEquals(false, myPool.isShutdown());
+
+ // should use default settings
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) myPool;
+ assertEquals(10, executor.getCorePoolSize());
+ assertEquals(20, executor.getMaximumPoolSize());
+ assertEquals(60, executor.getKeepAliveTime(TimeUnit.SECONDS));
assertEquals(Integer.MAX_VALUE, executor.getQueue().remainingCapacity());
context.stop();