You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/06 09:25:00 UTC

svn commit: r919713 - in /camel/trunk/camel-core/src/main/java/org/apache/camel: impl/ management/ model/ processor/ spi/

Author: davsclaus
Date: Sat Mar  6 08:24:59 2010
New Revision: 919713

URL: http://svn.apache.org/viewvc?rev=919713&view=rev
Log:
CAMEL-1437: Added SPI ExecutorServiceStrategy to abstrct how to use thread pools with Camel.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Sat Mar  6 08:24:59 2010
@@ -1556,7 +1556,7 @@
         } else {
             try {
                 LOG.info("JMX enabled. Using DefaultManagedLifecycleStrategy.");
-                answer = new ManagedManagementStrategy(new DefaultManagementAgent());
+                answer = new ManagedManagementStrategy(new DefaultManagementAgent(this));
                 // prefer to have it at first strategy
                 lifecycleStrategies.add(0, new DefaultManagementLifecycleStrategy(this));
             } catch (NoClassDefFoundError e) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Sat Mar  6 08:24:59 2010
@@ -36,32 +36,36 @@
         this.camelContext = camelContext;
     }
 
+    public String getThreadName(String nameSuffix) {
+        return ExecutorServiceHelper.getThreadName(nameSuffix);
+    }
+
     public ExecutorService lookup(String executorServiceRef) {
         return camelContext.getRegistry().lookup(executorServiceRef, ExecutorService.class);
     }
 
-    public ExecutorService newCachedThreadPool(String name) {
-        return ExecutorServiceHelper.newCachedThreadPool(name, true);
+    public ExecutorService newCachedThreadPool(String nameSuffix) {
+        return ExecutorServiceHelper.newCachedThreadPool(nameSuffix, true);
     }
 
-    public ScheduledExecutorService newScheduledThreadPool(String name, int poolSize) {
-        return ExecutorServiceHelper.newScheduledThreadPool(poolSize, name, true);
+    public ScheduledExecutorService newScheduledThreadPool(String nameSuffix, int poolSize) {
+        return ExecutorServiceHelper.newScheduledThreadPool(poolSize, nameSuffix, true);
     }
 
-    public ExecutorService newFixedThreadPool(String name, int poolSize) {
-        return ExecutorServiceHelper.newFixedThreadPool(poolSize, name, true);
+    public ExecutorService newFixedThreadPool(String nameSuffix, int poolSize) {
+        return ExecutorServiceHelper.newFixedThreadPool(poolSize, nameSuffix, true);
     }
 
-    public ExecutorService newSingleThreadExecutor(String name) {
-        return ExecutorServiceHelper.newSingleThreadExecutor(name, true);
+    public ExecutorService newSingleThreadExecutor(String nameSuffix) {
+        return ExecutorServiceHelper.newSingleThreadExecutor(nameSuffix, true);
     }
 
-    public ExecutorService newThreadPool(String name, int corePoolSize, int maxPoolSize) {
-        return ExecutorServiceHelper.newThreadPool(name, corePoolSize, maxPoolSize);
+    public ExecutorService newThreadPool(String nameSuffix, int corePoolSize, int maxPoolSize) {
+        return ExecutorServiceHelper.newThreadPool(nameSuffix, corePoolSize, maxPoolSize);
     }
 
-    public ExecutorService newThreadPool(String name, int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, boolean daemon) {
-        return ExecutorServiceHelper.newThreadPool(name, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, daemon);
+    public ExecutorService newThreadPool(String nameSuffix, int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, boolean daemon) {
+        return ExecutorServiceHelper.newThreadPool(nameSuffix, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, daemon);
     }
 
     public void shutdown(ExecutorService executorService) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Sat Mar  6 08:24:59 2010
@@ -240,7 +240,7 @@
 
     @Override
     protected void doStart() throws Exception {
-        ObjectHelper.notNull(camelContext, "CamelContext must be set");
+        ObjectHelper.notNull(camelContext, "CamelContext");
     }
 
     @Override

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java Sat Mar  6 08:24:59 2010
@@ -39,9 +39,11 @@
 import javax.management.remote.JMXConnectorServerFactory;
 import javax.management.remote.JMXServiceURL;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.spi.ManagementAgent;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.springframework.jmx.export.annotation.AnnotationJmxAttributeSource;
@@ -50,7 +52,7 @@
 /**
  * Default implementation of the Camel JMX service agent
  */
-public class DefaultManagementAgent extends ServiceSupport implements ManagementAgent {
+public class DefaultManagementAgent extends ServiceSupport implements ManagementAgent, CamelContextAware {
 
     public static final String DEFAULT_DOMAIN = "org.apache.camel";
     public static final String DEFAULT_HOST = "localhost";
@@ -59,6 +61,7 @@
     public static final String DEFAULT_SERVICE_URL_PATH = "/jmxrmi/camel";
     private static final transient Log LOG = LogFactory.getLog(DefaultManagementAgent.class);
 
+    private CamelContext camelContext;
     private ExecutorService executorService;
     private MBeanServer server;
     private final Set<ObjectName> mbeansRegistered = new HashSet<ObjectName>();
@@ -74,6 +77,13 @@
     private Boolean createConnector;
     private Boolean onlyRegisterProcessorWithCustomId;
 
+    public DefaultManagementAgent() {
+    }
+
+    public DefaultManagementAgent(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
     protected void finalizeSettings() {
         if (registryPort == null) {
             registryPort = Integer.getInteger(JmxSystemPropertyKeys.REGISTRY_PORT, DEFAULT_REGISTRY_PORT);
@@ -189,6 +199,14 @@
         this.executorService = executorService;
     }
 
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
     public void register(Object obj, ObjectName name) throws JMException {
         register(obj, name, false);
     }
@@ -224,6 +242,7 @@
     }
 
     protected void doStart() throws Exception {
+        ObjectHelper.notNull(camelContext, "CamelContext");
         assembler = new MetadataMBeanInfoAssembler();
         assembler.setAttributeSource(new AnnotationJmxAttributeSource());
 
@@ -393,8 +412,7 @@
 
         if (executorService == null) {
             // we only need a single for the JMX connector
-            // TODO use ExecutorServiceStrategy
-            executorService = ExecutorServiceHelper.newSingleThreadExecutor("JMXConnector: " + url, true);
+            executorService = camelContext.getExecutorServiceStrategy().newSingleThreadExecutor("JMXConnector: " + url);
         }
 
         // execute the JMX connector

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java Sat Mar  6 08:24:59 2010
@@ -18,6 +18,7 @@
 
 import javax.management.ObjectName;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.management.mbean.ManagedCamelContext;
 import org.apache.camel.management.mbean.ManagedComponent;
@@ -50,8 +51,8 @@
 
     private static final Log LOG = LogFactory.getLog(ManagedManagementStrategy.class);
 
-    public ManagedManagementStrategy() {
-        this(new DefaultManagementAgent());
+    public ManagedManagementStrategy(CamelContext camelContext) {
+        this(new DefaultManagementAgent(camelContext));
     }
 
     public ManagedManagementStrategy(ManagementAgent managementAgent) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java Sat Mar  6 08:24:59 2010
@@ -251,7 +251,7 @@
     protected Resequencer createBatchResequencer(RouteContext routeContext,
             BatchResequencerConfig config) throws Exception {
         Processor processor = routeContext.createProcessor(this);
-        Resequencer resequencer = new Resequencer(processor, resolveExpressionList(routeContext));
+        Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), processor, resolveExpressionList(routeContext));
         resequencer.setBatchSize(config.getBatchSize());
         resequencer.setBatchTimeout(config.getBatchTimeout());
         return resequencer;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Sat Mar  6 08:24:59 2010
@@ -28,7 +28,7 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.camel.CamelException;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Predicate;
@@ -38,7 +38,6 @@
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -63,17 +62,20 @@
     private boolean batchConsumer;
     private Predicate completionPredicate;
 
+    private final CamelContext camelContext;
     private final Processor processor;
     private final Collection<Exchange> collection;
     private ExceptionHandler exceptionHandler;
 
     private final BatchSender sender;
 
-    public BatchProcessor(Processor processor, Collection<Exchange> collection) {
+    public BatchProcessor(CamelContext camelContext, Processor processor, Collection<Exchange> collection) {
+        ObjectHelper.notNull(camelContext, "camelContext");
         ObjectHelper.notNull(processor, "processor");
         ObjectHelper.notNull(collection, "collection");
 
         // wrap processor in UnitOfWork so what we send out of the batch runs in a UoW
+        this.camelContext = camelContext;
         this.processor = new UnitOfWorkProcessor(processor);
         this.collection = collection;
         this.sender = new BatchSender();
@@ -259,7 +261,7 @@
         private Condition exchangeEnqueuedCondition = queueLock.newCondition();
 
         public BatchSender() {
-            super(ExecutorServiceHelper.getThreadName("Batch Sender"));
+            super(camelContext.getExecutorServiceStrategy().getThreadName("Batch Sender"));
             this.queue = new LinkedList<Exchange>();
         }
 
@@ -335,7 +337,7 @@
                                 sendExchanges();
                             } catch (Throwable t) {
                                 // a fail safe to handle all exceptions being thrown
-                                getExceptionHandler().handleException(new CamelException(t));
+                                getExceptionHandler().handleException(t);
                             }
                         } finally {
                             queueLock.lock();

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java Sat Mar  6 08:24:59 2010
@@ -21,6 +21,7 @@
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
@@ -37,16 +38,16 @@
 
     // TODO: Rework to avoid using BatchProcessor
 
-    public Resequencer(Processor processor, Expression expression) {
-        this(processor, createSet(expression));
+    public Resequencer(CamelContext camelContext, Processor processor, Expression expression) {
+        this(camelContext, processor, createSet(expression));
     }
 
-    public Resequencer(Processor processor, List<Expression> expressions) {
-        this(processor, createSet(expressions));
+    public Resequencer(CamelContext camelContext, Processor processor, List<Expression> expressions) {
+        this(camelContext, processor, createSet(expressions));
     }
 
-    public Resequencer(Processor processor, Set<Exchange> collection) {
-        super(processor, collection);
+    public Resequencer(CamelContext camelContext, Processor processor, Set<Exchange> collection) {
+        super(camelContext, processor, collection);
     }
 
     @Override

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java Sat Mar  6 08:24:59 2010
@@ -23,6 +23,7 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
@@ -206,8 +207,9 @@
                 }
                 try {
                     engine.deliver();
-                } catch (Exception e) {
-                    exceptionHandler.handleException(e);
+                } catch (Throwable t) {
+                    // a fail safe to handle all exceptions being thrown
+                    getExceptionHandler().handleException(t);
                 }
             }
         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=919713&r1=919712&r2=919713&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Sat Mar  6 08:24:59 2010
@@ -22,27 +22,111 @@
 import java.util.concurrent.TimeUnit;
 
 /**
+ * Strategy to create thread pools.
+ * <p/>
+ * This strategy is pluggable so you can plugin a custom provider, for example if you want to leverage
+ * the WorkManager for a J2EE server.
+ * <p/>
+ * This strategy has fine grained methods for creating various thread pools, however custom strategies
+ * do not have to exactly create those kind of pools. Feel free to return a shared or different kind of pool.
+ * <p/>
+ * However there are two types of pools: regular and scheduled.
+ *
  * @version $Revision$
  */
 public interface ExecutorServiceStrategy {
 
+    /**
+     * Creates a full thread name
+     *
+     * @param nameSuffix  suffix which is appended to the thread name
+     * @return the full thread name
+     */
+    String getThreadName(String nameSuffix);
+
+    /**
+     * Lookup a {@link java.util.concurrent.ExecutorService} from the {@link org.apache.camel.spi.Registry}.
+     *
+     * @param executorServiceRef  reference to lookup
+     * @return the {@link java.util.concurrent.ExecutorService} or <tt>null</tt> if not found
+     */
     ExecutorService lookup(String executorServiceRef);
 
-    ExecutorService newCachedThreadPool(String name);
-
-    ScheduledExecutorService newScheduledThreadPool(String name, int poolSize);
-
-    ExecutorService newFixedThreadPool(String name, int poolSize);
-
-    ExecutorService newSingleThreadExecutor(String name);
-
-    ExecutorService newThreadPool(String name, int corePoolSize, int maxPoolSize);
-
-    ExecutorService newThreadPool(final String name, int corePoolSize, int maxPoolSize,
+    /**
+     * Creates a new cached thread pool.
+     *
+     * @param nameSuffix  suffix which is appended to the thread name
+     * @return the thread pool
+     */
+    ExecutorService newCachedThreadPool(String nameSuffix);
+
+    /**
+     * Creates a new scheduled thread pool.
+     *
+     * @param nameSuffix  suffix which is appended to the thread name
+     * @param poolSize    the core pool size
+     * @return the thread pool
+     */
+    ScheduledExecutorService newScheduledThreadPool(String nameSuffix, int poolSize);
+
+    /**
+     * Creates a new fixed thread pool.
+     *
+     * @param nameSuffix  suffix which is appended to the thread name
+     * @param poolSize    the core pool size
+     * @return the thread pool
+     */
+    ExecutorService newFixedThreadPool(String nameSuffix, int poolSize);
+
+    /**
+     * Creates a new single-threaded thread pool. This is often used for background threads.
+     *
+     * @param nameSuffix  suffix which is appended to the thread name
+     * @return the thread pool
+     */
+    ExecutorService newSingleThreadExecutor(String nameSuffix);
+
+    /**
+     * Creates a new custom thread pool.
+     * <p/>
+     * Will by default use 60 seconds for keep alive time for idle threads.
+     *
+     * @param nameSuffix    suffix which is appended to the thread name
+     * @param corePoolSize  the core pool size
+     * @param maxPoolSize   the maximum pool size
+     * @return the thread pool
+     */
+    ExecutorService newThreadPool(String nameSuffix, int corePoolSize, int maxPoolSize);
+
+    /**
+     * Creates a new custom thread pool.
+     *
+     * @param nameSuffix    suffix which is appended to the thread name
+     * @param corePoolSize  the core pool size
+     * @param maxPoolSize   the maximum pool size
+     * @param keepAliveTime keep alive time for idle threads
+     * @param timeUnit      time unit for keep alive time
+     * @param daemon        whether or not the created threads is daemon or not
+     * @return the thread pool
+     */
+    ExecutorService newThreadPool(final String nameSuffix, int corePoolSize, int maxPoolSize,
                                   long keepAliveTime, TimeUnit timeUnit, boolean daemon);
 
+    /**
+     * Shutdown the given executor service.
+     *
+     * @param executorService the executor service to shutdown
+     * @see java.util.concurrent.ExecutorService#shutdown()
+     */
     void shutdown(ExecutorService executorService);
 
+    /**
+     * Shutdown now the given executor service.
+     *
+     * @param executorService the executor service to shutdown now
+     * @return list of tasks that never commenced execution
+     * @see java.util.concurrent.ExecutorService#shutdownNow()
+     */
     List<Runnable> shutdownNow(ExecutorService executorService);
 
 }