You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/06 09:25:00 UTC
svn commit: r919713 - in
/camel/trunk/camel-core/src/main/java/org/apache/camel: impl/ management/
model/ processor/ spi/
Author: davsclaus
Date: Sat Mar 6 08:24:59 2010
New Revision: 919713
URL: http://svn.apache.org/viewvc?rev=919713&view=rev
Log:
CAMEL-1437: Added SPI ExecutorServiceStrategy to abstrct how to use thread pools with Camel.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Sat Mar 6 08:24:59 2010
@@ -1556,7 +1556,7 @@
} else {
try {
LOG.info("JMX enabled. Using DefaultManagedLifecycleStrategy.");
- answer = new ManagedManagementStrategy(new DefaultManagementAgent());
+ answer = new ManagedManagementStrategy(new DefaultManagementAgent(this));
// prefer to have it at first strategy
lifecycleStrategies.add(0, new DefaultManagementLifecycleStrategy(this));
} catch (NoClassDefFoundError e) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Sat Mar 6 08:24:59 2010
@@ -36,32 +36,36 @@
this.camelContext = camelContext;
}
+ public String getThreadName(String nameSuffix) {
+ return ExecutorServiceHelper.getThreadName(nameSuffix);
+ }
+
public ExecutorService lookup(String executorServiceRef) {
return camelContext.getRegistry().lookup(executorServiceRef, ExecutorService.class);
}
- public ExecutorService newCachedThreadPool(String name) {
- return ExecutorServiceHelper.newCachedThreadPool(name, true);
+ public ExecutorService newCachedThreadPool(String nameSuffix) {
+ return ExecutorServiceHelper.newCachedThreadPool(nameSuffix, true);
}
- public ScheduledExecutorService newScheduledThreadPool(String name, int poolSize) {
- return ExecutorServiceHelper.newScheduledThreadPool(poolSize, name, true);
+ public ScheduledExecutorService newScheduledThreadPool(String nameSuffix, int poolSize) {
+ return ExecutorServiceHelper.newScheduledThreadPool(poolSize, nameSuffix, true);
}
- public ExecutorService newFixedThreadPool(String name, int poolSize) {
- return ExecutorServiceHelper.newFixedThreadPool(poolSize, name, true);
+ public ExecutorService newFixedThreadPool(String nameSuffix, int poolSize) {
+ return ExecutorServiceHelper.newFixedThreadPool(poolSize, nameSuffix, true);
}
- public ExecutorService newSingleThreadExecutor(String name) {
- return ExecutorServiceHelper.newSingleThreadExecutor(name, true);
+ public ExecutorService newSingleThreadExecutor(String nameSuffix) {
+ return ExecutorServiceHelper.newSingleThreadExecutor(nameSuffix, true);
}
- public ExecutorService newThreadPool(String name, int corePoolSize, int maxPoolSize) {
- return ExecutorServiceHelper.newThreadPool(name, corePoolSize, maxPoolSize);
+ public ExecutorService newThreadPool(String nameSuffix, int corePoolSize, int maxPoolSize) {
+ return ExecutorServiceHelper.newThreadPool(nameSuffix, corePoolSize, maxPoolSize);
}
- public ExecutorService newThreadPool(String name, int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, boolean daemon) {
- return ExecutorServiceHelper.newThreadPool(name, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, daemon);
+ public ExecutorService newThreadPool(String nameSuffix, int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, boolean daemon) {
+ return ExecutorServiceHelper.newThreadPool(nameSuffix, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, daemon);
}
public void shutdown(ExecutorService executorService) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Sat Mar 6 08:24:59 2010
@@ -240,7 +240,7 @@
@Override
protected void doStart() throws Exception {
- ObjectHelper.notNull(camelContext, "CamelContext must be set");
+ ObjectHelper.notNull(camelContext, "CamelContext");
}
@Override
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java Sat Mar 6 08:24:59 2010
@@ -39,9 +39,11 @@
import javax.management.remote.JMXConnectorServerFactory;
import javax.management.remote.JMXServiceURL;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.spi.ManagementAgent;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+import org.apache.camel.util.ObjectHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jmx.export.annotation.AnnotationJmxAttributeSource;
@@ -50,7 +52,7 @@
/**
* Default implementation of the Camel JMX service agent
*/
-public class DefaultManagementAgent extends ServiceSupport implements ManagementAgent {
+public class DefaultManagementAgent extends ServiceSupport implements ManagementAgent, CamelContextAware {
public static final String DEFAULT_DOMAIN = "org.apache.camel";
public static final String DEFAULT_HOST = "localhost";
@@ -59,6 +61,7 @@
public static final String DEFAULT_SERVICE_URL_PATH = "/jmxrmi/camel";
private static final transient Log LOG = LogFactory.getLog(DefaultManagementAgent.class);
+ private CamelContext camelContext;
private ExecutorService executorService;
private MBeanServer server;
private final Set<ObjectName> mbeansRegistered = new HashSet<ObjectName>();
@@ -74,6 +77,13 @@
private Boolean createConnector;
private Boolean onlyRegisterProcessorWithCustomId;
+ public DefaultManagementAgent() {
+ }
+
+ public DefaultManagementAgent(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
protected void finalizeSettings() {
if (registryPort == null) {
registryPort = Integer.getInteger(JmxSystemPropertyKeys.REGISTRY_PORT, DEFAULT_REGISTRY_PORT);
@@ -189,6 +199,14 @@
this.executorService = executorService;
}
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
public void register(Object obj, ObjectName name) throws JMException {
register(obj, name, false);
}
@@ -224,6 +242,7 @@
}
protected void doStart() throws Exception {
+ ObjectHelper.notNull(camelContext, "CamelContext");
assembler = new MetadataMBeanInfoAssembler();
assembler.setAttributeSource(new AnnotationJmxAttributeSource());
@@ -393,8 +412,7 @@
if (executorService == null) {
// we only need a single for the JMX connector
- // TODO use ExecutorServiceStrategy
- executorService = ExecutorServiceHelper.newSingleThreadExecutor("JMXConnector: " + url, true);
+ executorService = camelContext.getExecutorServiceStrategy().newSingleThreadExecutor("JMXConnector: " + url);
}
// execute the JMX connector
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java Sat Mar 6 08:24:59 2010
@@ -18,6 +18,7 @@
import javax.management.ObjectName;
+import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.management.mbean.ManagedCamelContext;
import org.apache.camel.management.mbean.ManagedComponent;
@@ -50,8 +51,8 @@
private static final Log LOG = LogFactory.getLog(ManagedManagementStrategy.class);
- public ManagedManagementStrategy() {
- this(new DefaultManagementAgent());
+ public ManagedManagementStrategy(CamelContext camelContext) {
+ this(new DefaultManagementAgent(camelContext));
}
public ManagedManagementStrategy(ManagementAgent managementAgent) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java Sat Mar 6 08:24:59 2010
@@ -251,7 +251,7 @@
protected Resequencer createBatchResequencer(RouteContext routeContext,
BatchResequencerConfig config) throws Exception {
Processor processor = routeContext.createProcessor(this);
- Resequencer resequencer = new Resequencer(processor, resolveExpressionList(routeContext));
+ Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), processor, resolveExpressionList(routeContext));
resequencer.setBatchSize(config.getBatchSize());
resequencer.setBatchTimeout(config.getBatchTimeout());
return resequencer;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Sat Mar 6 08:24:59 2010
@@ -28,7 +28,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.camel.CamelException;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
import org.apache.camel.Predicate;
@@ -38,7 +38,6 @@
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -63,17 +62,20 @@
private boolean batchConsumer;
private Predicate completionPredicate;
+ private final CamelContext camelContext;
private final Processor processor;
private final Collection<Exchange> collection;
private ExceptionHandler exceptionHandler;
private final BatchSender sender;
- public BatchProcessor(Processor processor, Collection<Exchange> collection) {
+ public BatchProcessor(CamelContext camelContext, Processor processor, Collection<Exchange> collection) {
+ ObjectHelper.notNull(camelContext, "camelContext");
ObjectHelper.notNull(processor, "processor");
ObjectHelper.notNull(collection, "collection");
// wrap processor in UnitOfWork so what we send out of the batch runs in a UoW
+ this.camelContext = camelContext;
this.processor = new UnitOfWorkProcessor(processor);
this.collection = collection;
this.sender = new BatchSender();
@@ -259,7 +261,7 @@
private Condition exchangeEnqueuedCondition = queueLock.newCondition();
public BatchSender() {
- super(ExecutorServiceHelper.getThreadName("Batch Sender"));
+ super(camelContext.getExecutorServiceStrategy().getThreadName("Batch Sender"));
this.queue = new LinkedList<Exchange>();
}
@@ -335,7 +337,7 @@
sendExchanges();
} catch (Throwable t) {
// a fail safe to handle all exceptions being thrown
- getExceptionHandler().handleException(new CamelException(t));
+ getExceptionHandler().handleException(t);
}
} finally {
queueLock.lock();
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java Sat Mar 6 08:24:59 2010
@@ -21,6 +21,7 @@
import java.util.Set;
import java.util.TreeSet;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
@@ -37,16 +38,16 @@
// TODO: Rework to avoid using BatchProcessor
- public Resequencer(Processor processor, Expression expression) {
- this(processor, createSet(expression));
+ public Resequencer(CamelContext camelContext, Processor processor, Expression expression) {
+ this(camelContext, processor, createSet(expression));
}
- public Resequencer(Processor processor, List<Expression> expressions) {
- this(processor, createSet(expressions));
+ public Resequencer(CamelContext camelContext, Processor processor, List<Expression> expressions) {
+ this(camelContext, processor, createSet(expressions));
}
- public Resequencer(Processor processor, Set<Exchange> collection) {
- super(processor, collection);
+ public Resequencer(CamelContext camelContext, Processor processor, Set<Exchange> collection) {
+ super(camelContext, processor, collection);
}
@Override
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java Sat Mar 6 08:24:59 2010
@@ -23,6 +23,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
@@ -206,8 +207,9 @@
}
try {
engine.deliver();
- } catch (Exception e) {
- exceptionHandler.handleException(e);
+ } catch (Throwable t) {
+ // a fail safe to handle all exceptions being thrown
+ getExceptionHandler().handleException(t);
}
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Sat Mar 6 08:24:59 2010
@@ -22,27 +22,111 @@
import java.util.concurrent.TimeUnit;
/**
+ * Strategy to create thread pools.
+ * <p/>
+ * This strategy is pluggable so you can plugin a custom provider, for example if you want to leverage
+ * the WorkManager for a J2EE server.
+ * <p/>
+ * This strategy has fine grained methods for creating various thread pools, however custom strategies
+ * do not have to exactly create those kind of pools. Feel free to return a shared or different kind of pool.
+ * <p/>
+ * However there are two types of pools: regular and scheduled.
+ *
* @version $Revision$
*/
public interface ExecutorServiceStrategy {
+ /**
+ * Creates a full thread name
+ *
+ * @param nameSuffix suffix which is appended to the thread name
+ * @return the full thread name
+ */
+ String getThreadName(String nameSuffix);
+
+ /**
+ * Lookup a {@link java.util.concurrent.ExecutorService} from the {@link org.apache.camel.spi.Registry}.
+ *
+ * @param executorServiceRef reference to lookup
+ * @return the {@link java.util.concurrent.ExecutorService} or <tt>null</tt> if not found
+ */
ExecutorService lookup(String executorServiceRef);
- ExecutorService newCachedThreadPool(String name);
-
- ScheduledExecutorService newScheduledThreadPool(String name, int poolSize);
-
- ExecutorService newFixedThreadPool(String name, int poolSize);
-
- ExecutorService newSingleThreadExecutor(String name);
-
- ExecutorService newThreadPool(String name, int corePoolSize, int maxPoolSize);
-
- ExecutorService newThreadPool(final String name, int corePoolSize, int maxPoolSize,
+ /**
+ * Creates a new cached thread pool.
+ *
+ * @param nameSuffix suffix which is appended to the thread name
+ * @return the thread pool
+ */
+ ExecutorService newCachedThreadPool(String nameSuffix);
+
+ /**
+ * Creates a new scheduled thread pool.
+ *
+ * @param nameSuffix suffix which is appended to the thread name
+ * @param poolSize the core pool size
+ * @return the thread pool
+ */
+ ScheduledExecutorService newScheduledThreadPool(String nameSuffix, int poolSize);
+
+ /**
+ * Creates a new fixed thread pool.
+ *
+ * @param nameSuffix suffix which is appended to the thread name
+ * @param poolSize the core pool size
+ * @return the thread pool
+ */
+ ExecutorService newFixedThreadPool(String nameSuffix, int poolSize);
+
+ /**
+ * Creates a new single-threaded thread pool. This is often used for background threads.
+ *
+ * @param nameSuffix suffix which is appended to the thread name
+ * @return the thread pool
+ */
+ ExecutorService newSingleThreadExecutor(String nameSuffix);
+
+ /**
+ * Creates a new custom thread pool.
+ * <p/>
+ * Will by default use 60 seconds for keep alive time for idle threads.
+ *
+ * @param nameSuffix suffix which is appended to the thread name
+ * @param corePoolSize the core pool size
+ * @param maxPoolSize the maximum pool size
+ * @return the thread pool
+ */
+ ExecutorService newThreadPool(String nameSuffix, int corePoolSize, int maxPoolSize);
+
+ /**
+ * Creates a new custom thread pool.
+ *
+ * @param nameSuffix suffix which is appended to the thread name
+ * @param corePoolSize the core pool size
+ * @param maxPoolSize the maximum pool size
+ * @param keepAliveTime keep alive time for idle threads
+ * @param timeUnit time unit for keep alive time
+ * @param daemon whether or not the created threads is daemon or not
+ * @return the thread pool
+ */
+ ExecutorService newThreadPool(final String nameSuffix, int corePoolSize, int maxPoolSize,
long keepAliveTime, TimeUnit timeUnit, boolean daemon);
+ /**
+ * Shutdown the given executor service.
+ *
+ * @param executorService the executor service to shutdown
+ * @see java.util.concurrent.ExecutorService#shutdown()
+ */
void shutdown(ExecutorService executorService);
+ /**
+ * Shutdown now the given executor service.
+ *
+ * @param executorService the executor service to shutdown now
+ * @return list of tasks that never commenced execution
+ * @see java.util.concurrent.ExecutorService#shutdownNow()
+ */
List<Runnable> shutdownNow(ExecutorService executorService);
}