You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/16 10:24:47 UTC
svn commit: r923642 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/builder/
main/java/org/apache/camel/component/bean/ main/java/org/apache/camel/impl/
main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/
main/java/org/...
Author: davsclaus
Date: Tue Mar 16 09:24:46 2010
New Revision: 923642
URL: http://svn.apache.org/viewvc?rev=923642&view=rev
Log:
CAMEL-1588: Introduced ThreadPoolProfile with a sensible default used by EIPs. Introduced ThreadPoolBuilder to easily create pools from Java.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java?rev=923642&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java Tue Mar 16 09:24:46 2010
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.model.ThreadPoolProfileDefinition;
+
+/**
+ * A builder to create thread pools.
+ *
+ * @version $Revision$
+ */
+public final class ThreadPoolBuilder {
+
+ private final CamelContext camelContext;
+ private ThreadPoolProfileDefinition threadPoolDefinition;
+
+ public ThreadPoolBuilder(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ // use the default thread profile as the base
+ this.threadPoolDefinition = new ThreadPoolProfileDefinition(camelContext.getExecutorServiceStrategy().getDefaultThreadPoolProfile());
+ }
+
+ public ThreadPoolBuilder poolSize(int poolSize) {
+ threadPoolDefinition.poolSize(poolSize);
+ return this;
+ }
+
+ public ThreadPoolBuilder maxPoolSize(int maxPoolSize) {
+ threadPoolDefinition.maxPoolSize(maxPoolSize);
+ return this;
+ }
+
+ public ThreadPoolBuilder keepAliveTime(long keepAliveTime) {
+ threadPoolDefinition.keepAliveTime(keepAliveTime);
+ return this;
+ }
+
+ public ThreadPoolBuilder timeUnit(TimeUnit timeUnit) {
+ threadPoolDefinition.timeUnit(timeUnit);
+ return this;
+ }
+
+ public ThreadPoolBuilder maxQueueSize(int maxQueueSize) {
+ threadPoolDefinition.maxQueueSize(maxQueueSize);
+ return this;
+ }
+
+ /**
+ * Lookup a {@link java.util.concurrent.ExecutorService} from the {@link org.apache.camel.spi.Registry}.
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param executorServiceRef reference to lookup
+ * @return the {@link java.util.concurrent.ExecutorService} or <tt>null</tt> if not found
+ */
+ public ExecutorService lookup(Object source, String executorServiceRef) {
+ return camelContext.getExecutorServiceStrategy().lookup(source, executorServiceRef);
+ }
+
+ /**
+ * Builds the new thread pool
+ *
+ * @param name name which is appended to the thread name
+ * @return the created thread pool
+ */
+ public ExecutorService build(String name) {
+ return build(null, name);
+ }
+
+ /**
+ * Builds the new thread pool
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @return the created thread pool
+ */
+ public ExecutorService build(Object source, String name) {
+ ExecutorService answer = camelContext.getExecutorServiceStrategy().newThreadPool(source, name,
+ threadPoolDefinition.getPoolSize(), threadPoolDefinition.getMaxPoolSize(),
+ threadPoolDefinition.getKeepAliveTime(), threadPoolDefinition.getTimeUnit(),
+ threadPoolDefinition.getMaxQueueSize(), false);
+
+ return answer;
+ }
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java Tue Mar 16 09:24:46 2010
@@ -91,8 +91,8 @@ public class MethodInfo {
}
if (annotation.parallelProcessoing() && recipientList.getExecutorService() == null) {
- // we are running in parallel so create a cached thread pool which grows/shrinks automatic
- ExecutorService executor = camelContext.getExecutorServiceStrategy().newCachedThreadPool(this, "@RecipientList");
+ // we are running in parallel so we need a thread pool
+ ExecutorService executor = camelContext.getExecutorServiceStrategy().newDefaultThreadPool(this, "@RecipientList");
recipientList.setExecutorService(executor);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Tue Mar 16 09:24:46 2010
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.spi.ExecutorServiceStrategy;
+import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,9 +38,26 @@ public class DefaultExecutorServiceStrat
private final List<ExecutorService> executorServices = new ArrayList<ExecutorService>();
private final CamelContext camelContext;
private String threadNamePattern = "Camel Thread ${counter} - ${name}";
+ private ThreadPoolProfile defaultThreadPoolProfile;
public DefaultExecutorServiceStrategy(CamelContext camelContext) {
this.camelContext = camelContext;
+ this.defaultThreadPoolProfile = new ThreadPoolProfileSupport();
+ this.defaultThreadPoolProfile.setDefaultProfile(true);
+ }
+
+ public ThreadPoolProfile getDefaultThreadPoolProfile() {
+ return defaultThreadPoolProfile;
+ }
+
+ public void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile) {
+ // the old is no long default
+ if (this.defaultThreadPoolProfile != null) {
+ this.defaultThreadPoolProfile.setDefaultProfile(false);
+ }
+ // and replace with the new default profile
+ this.defaultThreadPoolProfile = defaultThreadPoolProfile;
+ this.defaultThreadPoolProfile.setDefaultProfile(true);
}
public String getThreadName(String name) {
@@ -61,6 +79,15 @@ public class DefaultExecutorServiceStrat
return camelContext.getRegistry().lookup(executorServiceRef, ExecutorService.class);
}
+ public ExecutorService newDefaultThreadPool(Object source, String name) {
+ ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name,
+ defaultThreadPoolProfile.getPoolSize(), defaultThreadPoolProfile.getMaxPoolSize(),
+ defaultThreadPoolProfile.getKeepAliveTime(), defaultThreadPoolProfile.getTimeUnit(),
+ defaultThreadPoolProfile.getMaxQueueSize(), false);
+ onNewExecutorService(answer);
+ return answer;
+ }
+
public ExecutorService newCachedThreadPool(Object source, String name) {
ExecutorService answer = ExecutorServiceHelper.newCachedThreadPool(threadNamePattern, name, true);
onNewExecutorService(answer);
@@ -91,8 +118,10 @@ public class DefaultExecutorServiceStrat
return answer;
}
- public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, boolean daemon) {
- ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, daemon);
+ public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, long keepAliveTime,
+ TimeUnit timeUnit, int maxQueueSize, boolean daemon) {
+ ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize,
+ keepAliveTime, timeUnit, maxQueueSize, daemon);
onNewExecutorService(answer);
return answer;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Tue Mar 16 09:24:46 2010
@@ -53,7 +53,7 @@ public class DefaultProducerTemplate ext
public DefaultProducerTemplate(CamelContext context) {
this.context = context;
this.producerCache = new ProducerCache(context);
- this.executor = context.getExecutorServiceStrategy().newCachedThreadPool(this, "ProducerTemplate");
+ this.executor = context.getExecutorServiceStrategy().newDefaultThreadPool(this, "ProducerTemplate");
}
public DefaultProducerTemplate(CamelContext context, ExecutorService executor) {
@@ -683,7 +683,7 @@ public class DefaultProducerTemplate ext
super.start();
ServiceHelper.startService(producerCache);
if (executor == null) {
- executor = context.getExecutorServiceStrategy().newCachedThreadPool(this, "ProducerTemplate");
+ executor = context.getExecutorServiceStrategy().newDefaultThreadPool(this, "ProducerTemplate");
}
}
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java?rev=923642&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java Tue Mar 16 09:24:46 2010
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.spi.ThreadPoolProfile;
+
+/**
+ * @version $Revision$
+ */
+public class ThreadPoolProfileSupport implements ThreadPoolProfile {
+
+ private Boolean defaultProfile;
+ private Integer poolSize = 10;
+ private Integer maxPoolSize = 20;
+ private Long keepAliveTime = 60L;
+ private TimeUnit timeUnit = TimeUnit.SECONDS;
+ private Integer maxQueueSize = -1;
+
+ public Boolean isDefaultProfile() {
+ return defaultProfile;
+ }
+
+ public void setDefaultProfile(Boolean defaultProfile) {
+ this.defaultProfile = defaultProfile;
+ }
+
+ public Integer getPoolSize() {
+ return poolSize;
+ }
+
+ public void setPoolSize(Integer poolSize) {
+ this.poolSize = poolSize;
+ }
+
+ public Integer getMaxPoolSize() {
+ return maxPoolSize;
+ }
+
+ public void setMaxPoolSize(Integer maxPoolSize) {
+ this.maxPoolSize = maxPoolSize;
+ }
+
+ public Long getKeepAliveTime() {
+ return keepAliveTime;
+ }
+
+ public void setKeepAliveTime(Long keepAliveTime) {
+ this.keepAliveTime = keepAliveTime;
+ }
+
+ public TimeUnit getTimeUnit() {
+ return timeUnit;
+ }
+
+ public void setTimeUnit(TimeUnit timeUnit) {
+ this.timeUnit = timeUnit;
+ }
+
+ public Integer getMaxQueueSize() {
+ return maxQueueSize;
+ }
+
+ public void setMaxQueueSize(Integer maxQueueSize) {
+ this.maxQueueSize = maxQueueSize;
+ }
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Tue Mar 16 09:24:46 2010
@@ -154,7 +154,7 @@ public class AggregateDefinition extends
if (executorService == null) {
if (isParallelProcessing()) {
// we are running in parallel so create a cached thread pool which grows/shrinks automatic
- executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "Aggregator");
+ executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "Aggregator");
} else {
// use a single threaded if we are not running in parallel
executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newSingleThreadExecutor(this, "Aggregator");
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java Tue Mar 16 09:24:46 2010
@@ -151,7 +151,7 @@ public class MulticastDefinition extends
executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
if (isParallelProcessing() && executorService == null) {
// we are running in parallel so create a cached thread pool which grows/shrinks automatic
- executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "Multicast");
+ executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "Multicast");
}
return new MulticastProcessor(routeContext.getCamelContext(), list, aggregationStrategy, isParallelProcessing(),
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java Tue Mar 16 09:24:46 2010
@@ -99,7 +99,7 @@ public class OnCompletionDefinition exte
executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
if (executorService == null) {
- executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "OnCompletion");
+ executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "OnCompletion");
}
OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), childProcessor, executorService,
onCompleteOnly, onFailureOnly, when);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java Tue Mar 16 09:24:46 2010
@@ -366,7 +366,7 @@ public class OnExceptionDefinition exten
* Sets whether to log exhausted exceptions
*/
public OnExceptionDefinition logExhausted(boolean logExhausted) {
- getOrCreateRedeliveryPolicy().setLogExhasted(logExhausted);
+ getOrCreateRedeliveryPolicy().setLogExhausted(logExhausted);
return this;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Tue Mar 16 09:24:46 2010
@@ -98,7 +98,7 @@ public class RecipientListDefinition ext
executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
if (isParallelProcessing() && executorService == null) {
// we are running in parallel so create a cached thread pool which grows/shrinks automatic
- executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "RecipientList");
+ executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "RecipientList");
}
answer.setExecutorService(executorService);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java Tue Mar 16 09:24:46 2010
@@ -63,7 +63,7 @@ public class RedeliveryPolicyDefinition
@XmlAttribute
private Boolean logHandled;
@XmlAttribute
- private Boolean logExhasted;
+ private Boolean logExhausted;
@XmlAttribute
private Boolean disableRedelivery;
@@ -115,8 +115,8 @@ public class RedeliveryPolicyDefinition
if (logRetryAttempted != null) {
answer.setLogRetryAttempted(logRetryAttempted);
}
- if (logExhasted != null) {
- answer.setLogExhausted(logExhasted);
+ if (logExhausted != null) {
+ answer.setLogExhausted(logExhausted);
}
if (disableRedelivery != null && disableRedelivery) {
answer.setMaximumRedeliveries(0);
@@ -253,7 +253,7 @@ public class RedeliveryPolicyDefinition
* @return the builder
*/
public RedeliveryPolicyDefinition logExhausted(boolean logExhausted) {
- setLogExhasted(logExhausted);
+ setLogExhausted(logExhausted);
return this;
}
@@ -438,11 +438,11 @@ public class RedeliveryPolicyDefinition
this.logRetryAttempted = logRetryAttempted;
}
- public Boolean isLogExhasted() {
- return logExhasted;
+ public Boolean isLogExhausted() {
+ return logExhausted;
}
- public void setLogExhasted(Boolean logExhasted) {
- this.logExhasted = logExhasted;
+ public void setLogExhausted(Boolean logExhausted) {
+ this.logExhausted = logExhausted;
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java Tue Mar 16 09:24:46 2010
@@ -91,7 +91,7 @@ public class SplitDefinition extends Exp
executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
if (isParallelProcessing() && executorService == null) {
// we are running in parallel so create a cached thread pool which grows/shrinks automatic
- executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "Split");
+ executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "Split");
}
Expression exp = getExpression().createExpression(routeContext);
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java?rev=923642&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java Tue Mar 16 09:24:46 2010
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import java.util.concurrent.TimeUnit;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.camel.builder.xml.TimeUnitAdapter;
+import org.apache.camel.spi.ThreadPoolProfile;
+
+/**
+ * Represents an XML <threadPoolProfile/> element
+ *
+ * @version $Revision$
+ */
+@XmlRootElement(name = "threadPoolProfile")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ThreadPoolProfileDefinition extends OptionalIdentifiedDefinition implements ThreadPoolProfile {
+
+ @XmlAttribute()
+ private Boolean defaultProfile = false;
+ @XmlAttribute()
+ private Integer poolSize = 10;
+ @XmlAttribute()
+ private Integer maxPoolSize = 20;
+ @XmlAttribute()
+ private Long keepAliveTime = 60L;
+ @XmlJavaTypeAdapter(TimeUnitAdapter.class)
+ private TimeUnit timeUnit = TimeUnit.SECONDS;
+ @XmlAttribute()
+ private Integer maxQueueSize = -1;
+
+ public ThreadPoolProfileDefinition() {
+ }
+
+ public ThreadPoolProfileDefinition(ThreadPoolProfile threadPoolProfile) {
+ setDefaultProfile(threadPoolProfile.isDefaultProfile());
+ setPoolSize(threadPoolProfile.getPoolSize());
+ setMaxPoolSize(threadPoolProfile.getMaxPoolSize());
+ setKeepAliveTime(threadPoolProfile.getKeepAliveTime());
+ setTimeUnit(threadPoolProfile.getTimeUnit());
+ setMaxQueueSize(threadPoolProfile.getMaxQueueSize());
+ }
+
+ public ThreadPoolProfileDefinition poolSize(int poolSize) {
+ setPoolSize(poolSize);
+ return this;
+ }
+
+ public ThreadPoolProfileDefinition maxPoolSize(int maxPoolSize) {
+ setMaxPoolSize(maxPoolSize);
+ return this;
+ }
+
+ public ThreadPoolProfileDefinition keepAliveTime(long keepAliveTime) {
+ setKeepAliveTime(keepAliveTime);
+ return this;
+ }
+
+ public ThreadPoolProfileDefinition timeUnit(TimeUnit timeUnit) {
+ setTimeUnit(timeUnit);
+ return this;
+ }
+
+ public ThreadPoolProfileDefinition maxQueueSize(int maxQueueSize) {
+ setMaxQueueSize(maxQueueSize);
+ return this;
+ }
+
+ public Boolean isDefaultProfile() {
+ return defaultProfile;
+ }
+
+ public void setDefaultProfile(Boolean defaultProfile) {
+ this.defaultProfile = defaultProfile;
+ }
+
+ public Integer getPoolSize() {
+ return poolSize;
+ }
+
+ public void setPoolSize(Integer poolSize) {
+ this.poolSize = poolSize;
+ }
+
+ public Integer getMaxPoolSize() {
+ return maxPoolSize;
+ }
+
+ public void setMaxPoolSize(Integer maxPoolSize) {
+ this.maxPoolSize = maxPoolSize;
+ }
+
+ public Long getKeepAliveTime() {
+ return keepAliveTime;
+ }
+
+ public void setKeepAliveTime(Long keepAliveTime) {
+ this.keepAliveTime = keepAliveTime;
+ }
+
+ public TimeUnit getTimeUnit() {
+ return timeUnit;
+ }
+
+ public void setTimeUnit(TimeUnit timeUnit) {
+ this.timeUnit = timeUnit;
+ }
+
+ public Integer getMaxQueueSize() {
+ return maxQueueSize;
+ }
+
+ public void setMaxQueueSize(Integer maxQueueSize) {
+ this.maxQueueSize = maxQueueSize;
+ }
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Tue Mar 16 09:24:46 2010
@@ -44,20 +44,20 @@ public class ThreadsDefinition extends O
@XmlTransient
private ExecutorService executorService;
- @XmlAttribute(required = false)
+ @XmlAttribute
private String executorServiceRef;
- @XmlAttribute(required = false)
+ @XmlAttribute
private Integer poolSize;
- @XmlAttribute(required = false)
+ @XmlAttribute
private Integer maxPoolSize;
- @XmlAttribute(required = false)
+ @XmlAttribute
private Integer keepAliveTime = 60;
- @XmlAttribute(required = false)
+ @XmlAttribute
@XmlJavaTypeAdapter(TimeUnitAdapter.class)
private TimeUnit units = TimeUnit.SECONDS;
- @XmlAttribute(required = false)
+ @XmlAttribute
private String threadName;
- @XmlAttribute(required = false)
+ @XmlAttribute
private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected;
@Override
@@ -69,12 +69,12 @@ public class ThreadsDefinition extends O
String name = getThreadName() != null ? getThreadName() : "Threads";
if (poolSize == null || poolSize <= 0) {
// use the cached thread pool
- executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, name);
+ executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, name);
} else {
// use a custom pool based on the settings
int max = getMaxPoolSize() != null ? getMaxPoolSize() : poolSize;
executorService = routeContext.getCamelContext().getExecutorServiceStrategy()
- .newThreadPool(this, name, poolSize, max, getKeepAliveTime(), getUnits(), true);
+ .newThreadPool(this, name, poolSize, max, getKeepAliveTime(), getUnits(), -1, true);
}
}
Processor childProcessor = routeContext.createProcessor(this);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java Tue Mar 16 09:24:46 2010
@@ -69,7 +69,7 @@ public class WireTapDefinition extends S
executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, this);
if (executorService == null) {
- executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "WireTap");
+ executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "WireTap");
}
WireTapProcessor answer = new WireTapProcessor(endpoint, getPattern(), executorService);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java Tue Mar 16 09:24:46 2010
@@ -188,7 +188,7 @@ public class SendAsyncProcessor extends
if (producerExecutorService == null) {
// use a cached pool for the producers which can grow/schrink itself
producerExecutorService = destination.getCamelContext().getExecutorServiceStrategy()
- .newCachedThreadPool(this, "SendAsyncProcessor-Producer");
+ .newDefaultThreadPool(this, "SendAsyncProcessor-Producer");
}
return producerExecutorService;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Tue Mar 16 09:24:46 2010
@@ -33,12 +33,29 @@ import org.apache.camel.ShutdownableServ
* do not have to exactly create those kind of pools. Feel free to return a shared or different kind of pool.
* <p/>
* However there are two types of pools: regular and scheduled.
+ * <p/>
+ * If you use the <tt>newXXX</tt> methods to create thread pools, then Camel will by default take care of
+ * shutting down those created pools when {@link org.apache.camel.CamelContext} is shutting down.
*
* @version $Revision$
*/
public interface ExecutorServiceStrategy extends ShutdownableService {
/**
+ * Gets the default thread pool profile
+ *
+ * @return the default profile
+ */
+ ThreadPoolProfile getDefaultThreadPoolProfile();
+
+ /**
+ * Sets the default thread pool profile
+ *
+ * @param defaultThreadPoolProfile the new default thread pool profile
+ */
+ void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile);
+
+ /**
* Creates a full thread name
*
* @param name name which is appended to the full thread name
@@ -76,11 +93,20 @@ public interface ExecutorServiceStrategy
ExecutorService lookup(Object source, String executorServiceRef);
/**
+ * Creates a new thread pool using the default thread pool profile.
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @return the created thread pool
+ */
+ ExecutorService newDefaultThreadPool(Object source, String name);
+
+ /**
* Creates a new cached thread pool.
*
* @param source the source object, usually it should be <tt>this</tt> passed in as parameter
* @param name name which is appended to the thread name
- * @return the thread pool
+ * @return the created thread pool
*/
ExecutorService newCachedThreadPool(Object source, String name);
@@ -90,7 +116,7 @@ public interface ExecutorServiceStrategy
* @param source the source object, usually it should be <tt>this</tt> passed in as parameter
* @param name name which is appended to the thread name
* @param poolSize the core pool size
- * @return the thread pool
+ * @return the created thread pool
*/
ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize);
@@ -100,7 +126,7 @@ public interface ExecutorServiceStrategy
* @param source the source object, usually it should be <tt>this</tt> passed in as parameter
* @param name name which is appended to the thread name
* @param poolSize the core pool size
- * @return the thread pool
+ * @return the created thread pool
*/
ExecutorService newFixedThreadPool(Object source, String name, int poolSize);
@@ -109,7 +135,7 @@ public interface ExecutorServiceStrategy
*
* @param source the source object, usually it should be <tt>this</tt> passed in as parameter
* @param name name which is appended to the thread name
- * @return the thread pool
+ * @return the created thread pool
*/
ExecutorService newSingleThreadExecutor(Object source, String name);
@@ -122,7 +148,7 @@ public interface ExecutorServiceStrategy
* @param name name which is appended to the thread name
* @param corePoolSize the core pool size
* @param maxPoolSize the maximum pool size
- * @return the thread pool
+ * @return the created thread pool
*/
ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize);
@@ -135,11 +161,12 @@ public interface ExecutorServiceStrategy
* @param maxPoolSize the maximum pool size
* @param keepAliveTime keep alive time for idle threads
* @param timeUnit time unit for keep alive time
+ * @param maxQueueSize the maximum number of tasks in the queue, use <tt>Integer.MAX_INT</tt> or <tt>-1</tt> to indicate unbounded
* @param daemon whether or not the created threads is daemon or not
- * @return the thread pool
+ * @return the created thread pool
*/
ExecutorService newThreadPool(Object source, final String name, int corePoolSize, int maxPoolSize,
- long keepAliveTime, TimeUnit timeUnit, boolean daemon);
+ long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, boolean daemon);
/**
* Shutdown the given executor service.
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java?rev=923642&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java Tue Mar 16 09:24:46 2010
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A profile which defines thread pool settings.
+ *
+ * @version $Revision$
+ */
+public interface ThreadPoolProfile {
+
+ /**
+ * Whether this profile is the default profile (there can only be one).
+ *
+ * @return <tt>true</tt> if its the default profile, <tt>false</tt> otherwise
+ */
+ Boolean isDefaultProfile();
+
+ /**
+ * Sets whether this profile is the default profile (there can only be one).
+ *
+ * @param defaultProfile the option
+ */
+ void setDefaultProfile(Boolean defaultProfile);
+
+ /**
+ * Gets the core pool size (threads to keep minimum in pool)
+ *
+ * @return the pool size
+ */
+ Integer getPoolSize();
+
+ /**
+ * Sets the core pool size (threads to keep minimum in pool)
+ *
+ * @param poolSize the pool size
+ */
+ void setPoolSize(Integer poolSize);
+
+ /**
+ * Gets the maximum pool size
+ *
+ * @return the maximum pool size
+ */
+ Integer getMaxPoolSize();
+
+ /**
+ * Sets the maximum pool size
+ *
+ * @param maxPoolSize the maximum pool size
+ */
+ void setMaxPoolSize(Integer maxPoolSize);
+
+ /**
+ * Gets the keep alive time for inactive threads
+ *
+ * @return the keep alive time
+ */
+ Long getKeepAliveTime();
+
+ /**
+ * Sets the keep alive time for inactive threads
+ *
+ * @param keepAliveTime the keep alive time
+ */
+ void setKeepAliveTime(Long keepAliveTime);
+
+ /**
+ * Gets the time unit used for keep alive time
+ *
+ * @return the time unit
+ */
+ TimeUnit getTimeUnit();
+
+ /**
+ * Sets the time unit used for keep alive time
+ *
+ * @param timeUnit the time unit
+ */
+ void setTimeUnit(TimeUnit timeUnit);
+
+ /**
+ * Gets the maximum number of tasks in the work queue.
+ * <p/>
+ * Use <tt>-1</tt> or <tt>Integer.MAX_VALUE</tt> for an unbounded queue
+ *
+ * @return the max queue size
+ */
+ Integer getMaxQueueSize();
+
+ /**
+ * Sets the maximum number of tasks in the work queue.
+ * <p/>
+ * Use <tt>-1</tt> or <tt>Integer.MAX_VALUE</tt> for an unbounded queue
+ *
+ * @param maxQueueSize the max queue size
+ */
+ void setMaxQueueSize(Integer maxQueueSize);
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Tue Mar 16 09:24:46 2010
@@ -16,6 +16,7 @@
*/
package org.apache.camel.util.concurrent;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@@ -132,7 +133,7 @@ public final class ExecutorServiceHelper
}
/**
- * Creates a new cached thread pool which should be the most commonly used.
+ * Creates a new cached thread pool
*
* @param pattern pattern of the thread name
* @param name ${name} in the pattern name
@@ -150,7 +151,7 @@ public final class ExecutorServiceHelper
}
/**
- * Creates a new custom thread pool using 60 seconds as keep alive
+ * Creates a new custom thread pool using 60 seconds as keep alive and with an unbounded queue.
*
* @param pattern pattern of the thread name
* @param name ${name} in the pattern name
@@ -159,7 +160,7 @@ public final class ExecutorServiceHelper
* @return the created pool
*/
public static ExecutorService newThreadPool(final String pattern, final String name, int corePoolSize, int maxPoolSize) {
- return ExecutorServiceHelper.newThreadPool(pattern, name, corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, true);
+ return ExecutorServiceHelper.newThreadPool(pattern, name, corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, -1, true);
}
/**
@@ -171,20 +172,30 @@ public final class ExecutorServiceHelper
* @param maxPoolSize the maximum pool size
* @param keepAliveTime keep alive
* @param timeUnit keep alive time unit
+ * @param maxQueueSize the maximum number of tasks in the queue, use <tt>Integer.MAX_VALUE</tt> or <tt>-1</tt> to indicate unbounded
* @param daemon whether the threads is daemon or not
* @return the created pool
* @throws IllegalArgumentException if parameters is not valid
*/
public static ExecutorService newThreadPool(final String pattern, final String name, int corePoolSize, int maxPoolSize,
- long keepAliveTime, TimeUnit timeUnit, final boolean daemon) {
+ long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, final boolean daemon) {
// validate max >= core
if (maxPoolSize < corePoolSize) {
throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
}
- ThreadPoolExecutor answer = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
- keepAliveTime, timeUnit, new LinkedBlockingQueue<Runnable>());
+ if (maxQueueSize == 0) {
+ throw new IllegalArgumentException("MaxQueueSize cannot be 0.");
+ }
+
+ BlockingQueue<Runnable> queue;
+ if (maxQueueSize < 0) {
+ queue = new LinkedBlockingQueue<Runnable>();
+ } else {
+ queue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
+ }
+ ThreadPoolExecutor answer = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, queue);
answer.setThreadFactory(new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread answer = new Thread(r, getThreadName(pattern, name));
Modified: camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index (original)
+++ camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index Tue Mar 16 09:24:46 2010
@@ -69,6 +69,7 @@ SetPropertyDefinition
SortDefinition
SplitDefinition
StopDefinition
+ThreadPoolProfileDefinition
ThreadsDefinition
ThrottleDefinition
ThrowExceptionDefinition
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java?rev=923642&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java Tue Mar 16 09:24:46 2010
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.impl.JndiRegistry;
+
+/**
+ * @version $Revision$
+ */
+public class ThreadPoolBuilderTest extends ContextTestSupport {
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+ ExecutorService someone = Executors.newCachedThreadPool();
+ jndi.bind("someonesPool", someone);
+ return jndi;
+ }
+
+ public void testThreadPoolBuilderLookup() throws Exception {
+ ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+ ExecutorService executor = builder.lookup(this, "someonesPool");
+ assertNotNull(executor);
+
+ assertEquals(false, executor.isShutdown());
+ context.stop();
+ // you need to manage this pool yourself
+ assertEquals(false, executor.isShutdown());
+ }
+
+ public void testThreadPoolBuilderDefault() throws Exception {
+ ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+ ExecutorService executor = builder.build(this, "myPool");
+ assertNotNull(executor);
+
+ assertEquals(false, executor.isShutdown());
+ context.stop();
+ assertEquals(true, executor.isShutdown());
+ }
+
+ public void testThreadPoolBuilderMaxQueueSize() throws Exception {
+ ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+ ExecutorService executor = builder.maxQueueSize(2000).build(this, "myPool");
+ assertNotNull(executor);
+
+ assertEquals(false, executor.isShutdown());
+ context.stop();
+ assertEquals(true, executor.isShutdown());
+ }
+
+ public void testThreadPoolBuilderMax() throws Exception {
+ ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+ ExecutorService executor = builder.maxPoolSize(100).build(this, "myPool");
+ assertNotNull(executor);
+
+ assertEquals(false, executor.isShutdown());
+ context.stop();
+ assertEquals(true, executor.isShutdown());
+ }
+
+ public void testThreadPoolBuilderCoreAndMax() throws Exception {
+ ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+ ExecutorService executor = builder.poolSize(50).maxPoolSize(100).build(this, "myPool");
+ assertNotNull(executor);
+
+ assertEquals(false, executor.isShutdown());
+ context.stop();
+ assertEquals(true, executor.isShutdown());
+ }
+
+ public void testThreadPoolBuilderKeepAlive() throws Exception {
+ ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+ ExecutorService executor = builder.keepAliveTime(30).build(this, "myPool");
+ assertNotNull(executor);
+
+ assertEquals(false, executor.isShutdown());
+ context.stop();
+ assertEquals(true, executor.isShutdown());
+ }
+
+ public void testThreadPoolBuilderKeepAliveTimeUnit() throws Exception {
+ ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+ ExecutorService executor = builder.keepAliveTime(20000).timeUnit(TimeUnit.MILLISECONDS).build(this, "myPool");
+ assertNotNull(executor);
+
+ assertEquals(false, executor.isShutdown());
+ context.stop();
+ assertEquals(true, executor.isShutdown());
+ }
+
+ public void testThreadPoolBuilderAll() throws Exception {
+ ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+ ExecutorService executor = builder.poolSize(50).maxPoolSize(100).maxQueueSize(2000)
+ .keepAliveTime(20000).timeUnit(TimeUnit.MILLISECONDS).build(this, "myPool");
+ assertNotNull(executor);
+
+ assertEquals(false, executor.isShutdown());
+ context.stop();
+ assertEquals(true, executor.isShutdown());
+ }
+
+ public void testThreadPoolBuilderTwoPoolsDefault() throws Exception {
+ ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
+ ExecutorService executor = builder.build(this, "myPool");
+ ExecutorService executor2 = builder.build(this, "myOtherPool");
+
+ assertNotNull(executor);
+ assertNotNull(executor2);
+
+ assertEquals(false, executor.isShutdown());
+ assertEquals(false, executor2.isShutdown());
+ context.stop();
+ assertEquals(true, executor.isShutdown());
+ assertEquals(true, executor2.isShutdown());
+ }
+
+
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java Tue Mar 16 09:24:46 2010
@@ -16,6 +16,10 @@
*/
package org.apache.camel.impl;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.ContextTestSupport;
/**
@@ -68,4 +72,44 @@ public class DefaultExecutorServiceStrat
// reset it so we can shutdown properly
context.getExecutorServiceStrategy().setThreadNamePattern("Camel Thread ${counter} - ${name}");
}
+
+ public void testDefaultThreadPool() throws Exception {
+ ExecutorService myPool = context.getExecutorServiceStrategy().newDefaultThreadPool(this, "myPool");
+ assertEquals(false, myPool.isShutdown());
+
+ // should use default settings
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) myPool;
+ assertEquals(10, executor.getCorePoolSize());
+ assertEquals(20, executor.getMaximumPoolSize());
+ assertEquals(60, executor.getKeepAliveTime(TimeUnit.SECONDS));
+ assertEquals(Integer.MAX_VALUE, executor.getQueue().remainingCapacity());
+
+ context.stop();
+ assertEquals(true, myPool.isShutdown());
+ }
+
+ public void testCustomDefaultThreadPool() throws Exception {
+ ThreadPoolProfileSupport custom = new ThreadPoolProfileSupport();
+ custom.setKeepAliveTime(20L);
+ custom.setMaxPoolSize(40);
+ custom.setPoolSize(5);
+ custom.setMaxQueueSize(2000);
+
+ context.getExecutorServiceStrategy().setDefaultThreadPoolProfile(custom);
+ assertEquals(true, custom.isDefaultProfile().booleanValue());
+
+ ExecutorService myPool = context.getExecutorServiceStrategy().newDefaultThreadPool(this, "myPool");
+ assertEquals(false, myPool.isShutdown());
+
+ // should use default settings
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) myPool;
+ assertEquals(5, executor.getCorePoolSize());
+ assertEquals(40, executor.getMaximumPoolSize());
+ assertEquals(20, executor.getKeepAliveTime(TimeUnit.SECONDS));
+ assertEquals(2000, executor.getQueue().remainingCapacity());
+
+ context.stop();
+ assertEquals(true, myPool.isShutdown());
+ }
+
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java?rev=923642&r1=923641&r2=923642&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java Tue Mar 16 09:24:46 2010
@@ -94,7 +94,7 @@ public class AggregateShutdownThreadPool
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- myPool = context.getExecutorServiceStrategy().newCachedThreadPool(this, "myPool");
+ myPool = context.getExecutorServiceStrategy().newDefaultThreadPool(this, "myPool");
from("direct:foo").routeId("foo")
.aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(3)