You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/17 11:54:58 UTC

svn commit: r924220 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java main/java/org/apache/camel/processor/MulticastProcessor.java test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java

Author: davsclaus
Date: Wed Mar 17 10:54:57 2010
New Revision: 924220

URL: http://svn.apache.org/viewvc?rev=924220&view=rev
Log:
CAMEL-1588: Using default work queue capacity of 1000 to avoid queing too many tasks if consumers cannot keep up.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java?rev=924220&r1=924219&r2=924220&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java Wed Mar 17 10:54:57 2010
@@ -32,7 +32,7 @@ public class ThreadPoolProfileSupport im
     private Integer maxPoolSize = 20;
     private Long keepAliveTime = 60L;
     private TimeUnit timeUnit = TimeUnit.SECONDS;
-    private Integer maxQueueSize = -1;
+    private Integer maxQueueSize = 1000;
     private ThreadPoolRejectedPolicy rejectedPolicy;
 
     public Boolean isDefaultProfile() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=924220&r1=924219&r2=924220&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed Mar 17 10:54:57 2010
@@ -101,7 +101,7 @@ public class MulticastProcessor extends 
     private final CamelContext camelContext;
     private Collection<Processor> processors;
     private final AggregationStrategy aggregationStrategy;
-    private final boolean isParallelProcessing;
+    private final boolean parallelProcessing;
     private final boolean streaming;
     private final boolean stopOnException;
     private final ExecutorService executorService;
@@ -121,10 +121,11 @@ public class MulticastProcessor extends 
         this.camelContext = camelContext;
         this.processors = processors;
         this.aggregationStrategy = aggregationStrategy;
-        this.isParallelProcessing = parallelProcessing;
         this.executorService = executorService;
         this.streaming = streaming;
         this.stopOnException = stopOnException;
+        // must enable parallel if executor service is provided
+        this.parallelProcessing = parallelProcessing || executorService != null;
     }
 
     @Override
@@ -403,7 +404,7 @@ public class MulticastProcessor extends 
     }
 
     public boolean isParallelProcessing() {
-        return isParallelProcessing;
+        return parallelProcessing;
     }
 
     public List<Processor> next() {

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java?rev=924220&r1=924219&r2=924220&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java Wed Mar 17 10:54:57 2010
@@ -82,6 +82,27 @@ public class DefaultExecutorServiceStrat
         assertEquals(10, executor.getCorePoolSize());
         assertEquals(20, executor.getMaximumPoolSize());
         assertEquals(60, executor.getKeepAliveTime(TimeUnit.SECONDS));
+        assertEquals(1000, executor.getQueue().remainingCapacity());
+
+        context.stop();
+        assertEquals(true, myPool.isShutdown());
+    }
+
+    public void testDefaultUnboundedQueueThreadPool() throws Exception {
+        ThreadPoolProfileSupport custom = new ThreadPoolProfileSupport();
+        custom.setMaxQueueSize(-1);
+
+        context.getExecutorServiceStrategy().setDefaultThreadPoolProfile(custom);
+        assertEquals(true, custom.isDefaultProfile().booleanValue());
+
+        ExecutorService myPool = context.getExecutorServiceStrategy().newDefaultThreadPool(this, "myPool");
+        assertEquals(false, myPool.isShutdown());
+
+        // should use default settings
+        ThreadPoolExecutor executor = (ThreadPoolExecutor) myPool;
+        assertEquals(10, executor.getCorePoolSize());
+        assertEquals(20, executor.getMaximumPoolSize());
+        assertEquals(60, executor.getKeepAliveTime(TimeUnit.SECONDS));
         assertEquals(Integer.MAX_VALUE, executor.getQueue().remainingCapacity());
 
         context.stop();