You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/19 12:39:02 UTC

svn commit: r925181 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/management/ camel-core/src/main/java/org/apache/camel/management/mbean/ camel-core/src/main/java/org/apache/camel/spi/ came...

Author: davsclaus
Date: Fri Mar 19 11:39:01 2010
New Revision: 925181

URL: http://svn.apache.org/viewvc?rev=925181&view=rev
Log:
CAMEL-1588: ThreadPools is now managable from JMX.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java
      - copied, changed from r925107, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedTracer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/LifecycleStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DummyLifecycleStrategy.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java
    camel/trunk/components/camel-osgi/src/main/java/org/apache/camel/osgi/OsgiServiceRegistry.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/DummyLifecycleStrategy.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java Fri Mar 19 11:39:01 2010
@@ -54,8 +54,8 @@ public class DefaultConsumerTemplate imp
         return consumerCache.receive(endpoint);
     }
 
-    public Exchange receive(Endpoint endpoinit) {
-        return receive(endpoinit.getEndpointUri());
+    public Exchange receive(Endpoint endpoint) {
+        return receive(endpoint.getEndpointUri());
     }
 
     public Exchange receive(String endpointUri, long timeout) {
@@ -137,7 +137,7 @@ public class DefaultConsumerTemplate imp
     /**
      * Extracts the body from the given result.
      * <p/>
-     * If the exchange pattern is provided it will try to honor it and retrive the body
+     * If the exchange pattern is provided it will try to honor it and retrieve the body
      * from either IN or OUT according to the pattern.
      *
      * @param result   the result

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Fri Mar 19 11:39:01 2010
@@ -21,10 +21,12 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.spi.ExecutorServiceStrategy;
+import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
@@ -91,8 +93,7 @@ public class DefaultExecutorServiceStrat
 
     public ExecutorService newCachedThreadPool(Object source, String name) {
         ExecutorService answer = ExecutorServiceHelper.newCachedThreadPool(threadNamePattern, name, true);
-        executorServices.add(answer);
-        onNewExecutorService(answer);
+        onThreadPoolCreated(answer);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Created new cached thread pool for source: " + source + " with name: " + name + ". -> " + answer);
@@ -102,8 +103,7 @@ public class DefaultExecutorServiceStrat
 
     public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) {
         ScheduledExecutorService answer = ExecutorServiceHelper.newScheduledThreadPool(poolSize, threadNamePattern, name, true);
-        executorServices.add(answer);
-        onNewExecutorService(answer);
+        onThreadPoolCreated(answer);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Created new scheduled thread pool for source: " + source + " with name: " + name + ". [poolSize=" + poolSize + "]. -> " + answer);
@@ -113,8 +113,7 @@ public class DefaultExecutorServiceStrat
 
     public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) {
         ExecutorService answer = ExecutorServiceHelper.newFixedThreadPool(poolSize, threadNamePattern, name, true);
-        executorServices.add(answer);
-        onNewExecutorService(answer);
+        onThreadPoolCreated(answer);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Created new fixed thread pool for source: " + source + " with name: " + name + ". [poolSize=" + poolSize + "]. -> " + answer);
@@ -124,8 +123,7 @@ public class DefaultExecutorServiceStrat
 
     public ExecutorService newSingleThreadExecutor(Object source, String name) {
         ExecutorService answer = ExecutorServiceHelper.newSingleThreadExecutor(threadNamePattern, name, true);
-        executorServices.add(answer);
-        onNewExecutorService(answer);
+        onThreadPoolCreated(answer);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Created new single thread pool for source: " + source + " with name: " + name + ". -> " + answer);
@@ -135,8 +133,7 @@ public class DefaultExecutorServiceStrat
 
     public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize) {
         ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize);
-        executorServices.add(answer);
-        onNewExecutorService(answer);
+        onThreadPoolCreated(answer);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Created new thread pool for source: " + source + " with name: " + name + ". [poolSize=" + corePoolSize
@@ -150,8 +147,7 @@ public class DefaultExecutorServiceStrat
                                          boolean daemon) {
         ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize, keepAliveTime,
                                                                      timeUnit, maxQueueSize, rejectedExecutionHandler, daemon);
-        executorServices.add(answer);
-        onNewExecutorService(answer);
+        onThreadPoolCreated(answer);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Created new thread pool for source: " + source + " with name: " + name + ". [poolSize=" + corePoolSize
@@ -192,6 +188,22 @@ public class DefaultExecutorServiceStrat
         return answer;
     }
 
+    private void onThreadPoolCreated(ExecutorService executorService) {
+        // add to internal list of thread pools
+        executorServices.add(executorService);
+
+        // let lifecycle strategy be notified as well which can let it be managed in JMX as well
+        if (executorService instanceof ThreadPoolExecutor) {
+            ThreadPoolExecutor threadPool = (ThreadPoolExecutor) executorService;
+            for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
+                lifecycle.onThreadPoolAdd(camelContext, threadPool);
+            }
+        }
+
+        // now call strategy to allow custom logic
+        onNewExecutorService(executorService);
+    }
+
     /**
      * Strategy callback when a new {@link java.util.concurrent.ExecutorService} have been created.
      *

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java Fri Mar 19 11:39:01 2010
@@ -20,6 +20,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadPoolExecutor;
 import javax.management.JMException;
 
 import org.apache.camel.CamelContext;
@@ -50,6 +51,7 @@ import org.apache.camel.management.mbean
 import org.apache.camel.management.mbean.ManagedScheduledPollConsumer;
 import org.apache.camel.management.mbean.ManagedSendProcessor;
 import org.apache.camel.management.mbean.ManagedService;
+import org.apache.camel.management.mbean.ManagedThreadPool;
 import org.apache.camel.management.mbean.ManagedThrottler;
 import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy;
 import org.apache.camel.management.mbean.ManagedTracer;
@@ -434,7 +436,31 @@ public class DefaultManagementLifecycleS
         try {
             getManagementStrategy().manageObject(me);
         } catch (Exception e) {
-            LOG.warn("Could not register error handler builder: " + errorHandlerBuilder + " as ErrorHandlerMBean.", e);
+            LOG.warn("Could not register error handler builder: " + errorHandlerBuilder + " as ErrorHandler MBean.", e);
+        }
+    }
+
+    public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool) {
+        // the agent hasn't been started
+        if (!initialized) {
+            return;
+        }
+
+        ManagedThreadPool mtp = new ManagedThreadPool(camelContext, threadPool);
+        mtp.init(getManagementStrategy());
+
+        // skip already managed services, for example if a route has been restarted
+        if (getManagementStrategy().isManaged(mtp, null)) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("The thread pool is already managed: " + threadPool);
+            }
+            return;
+        }
+
+        try {
+            getManagementStrategy().manageObject(mtp);
+        } catch (Exception e) {
+            LOG.warn("Could not register thread pool: " + threadPool + " as ThreadPool MBean.", e);
         }
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java Fri Mar 19 11:39:01 2010
@@ -18,6 +18,7 @@ package org.apache.camel.management;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.concurrent.ThreadPoolExecutor;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
@@ -54,6 +55,7 @@ public class DefaultManagementNamingStra
     public static final String TYPE_COMPONENT = "components";
     public static final String TYPE_TRACER = "tracer";
     public static final String TYPE_ERRORHANDLER = "errorhandlers";
+    public static final String TYPE_THREAD_POOL = "threadpools";
     public static final String TYPE_SERVICE = "services";
 
     protected String domainName;
@@ -228,6 +230,17 @@ public class DefaultManagementNamingStra
         return createObjectName(buffer);
     }
 
+    public ObjectName getObjectNameForThreadPool(CamelContext context, ThreadPoolExecutor threadPool) throws MalformedObjectNameException {
+        StringBuffer buffer = new StringBuffer();
+        buffer.append(domainName).append(":");
+        buffer.append(KEY_CONTEXT + "=").append(getContextId(context)).append(",");
+        buffer.append(KEY_TYPE + "=" + TYPE_THREAD_POOL + ",");
+        buffer.append(KEY_NAME + "=")
+            .append(threadPool.getClass().getSimpleName())
+            .append("(").append(ObjectHelper.getIdentityHashCode(threadPool)).append(")");
+        return createObjectName(buffer);
+    }
+
     public String getDomainName() {
         return domainName;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java Fri Mar 19 11:39:01 2010
@@ -29,6 +29,7 @@ import org.apache.camel.management.mbean
 import org.apache.camel.management.mbean.ManagedProducer;
 import org.apache.camel.management.mbean.ManagedRoute;
 import org.apache.camel.management.mbean.ManagedService;
+import org.apache.camel.management.mbean.ManagedThreadPool;
 import org.apache.camel.management.mbean.ManagedTracer;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.spi.ManagementAgent;
@@ -110,6 +111,9 @@ public class ManagedManagementStrategy e
         } else if (managedObject instanceof ManagedTracer) {
             ManagedTracer mt = (ManagedTracer) managedObject;
             objectName = getManagementNamingStrategy().getObjectNameForTracer(mt.getCamelContext(), mt.getTracer());
+        } else if (managedObject instanceof ManagedThreadPool) {
+            ManagedThreadPool mes = (ManagedThreadPool) managedObject;
+            objectName = getManagementNamingStrategy().getObjectNameForThreadPool(mes.getCamelContext(), mes.getThreadPool());
         } else if (managedObject instanceof ManagedService) {
             // check for managed service should be last
             ManagedService ms = (ManagedService) managedObject;

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java?rev=925181&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java Fri Mar 19 11:39:01 2010
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.ManagementStrategy;
+import org.springframework.jmx.export.annotation.ManagedAttribute;
+import org.springframework.jmx.export.annotation.ManagedResource;
+
+/**
+ * @version $Revision$
+ */
+@ManagedResource(description = "Managed ThreadPool")
+public class ManagedThreadPool {
+
+    private final CamelContext camelContext;
+    private final ThreadPoolExecutor threadPool;
+
+    public ManagedThreadPool(CamelContext camelContext, ThreadPoolExecutor threadPool) {
+        this.camelContext = camelContext;
+        this.threadPool = threadPool;
+    }
+
+    public void init(ManagementStrategy strategy) {
+        // do nothing
+    }
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public ThreadPoolExecutor getThreadPool() {
+        return threadPool;
+    }
+
+    @ManagedAttribute(description = "Core pool size")
+    public int getCorePoolSize() {
+        return threadPool.getCorePoolSize();
+    }
+
+    @ManagedAttribute(description = "Core pool size")
+    public void setCorePoolSize(int corePoolSize) {
+        threadPool.setCorePoolSize(corePoolSize);
+    }
+
+    @ManagedAttribute(description = "Pool size")
+    public int getPoolSize() {
+        return threadPool.getPoolSize();
+    }
+
+    @ManagedAttribute(description = "Maximum pool size")
+    public int getMaximumPoolSize() {
+        return threadPool.getMaximumPoolSize();
+    }
+
+    @ManagedAttribute(description = "Maximum pool size")
+    public void setMaximumPoolSize(int maximumPoolSize) {
+        threadPool.setMaximumPoolSize(maximumPoolSize);
+    }
+
+    @ManagedAttribute(description = "Largest pool size")
+    public int getLargestPoolSize() {
+        return threadPool.getLargestPoolSize();
+    }
+
+    @ManagedAttribute(description = "Active count")
+    public int getActiveCount() {
+        return threadPool.getActiveCount();
+    }
+
+    @ManagedAttribute(description = "Task count")
+    public long getTaskCount() {
+        return threadPool.getTaskCount();
+    }
+
+    @ManagedAttribute(description = "Completed task count")
+    public long getCompletedTaskCount() {
+        return threadPool.getCompletedTaskCount();
+    }
+
+    @ManagedAttribute(description = "Keep alive time in seconds")
+    public long getKeepAliveTime() {
+        return threadPool.getKeepAliveTime(TimeUnit.SECONDS);
+    }
+
+    @ManagedAttribute(description = "Keep alive time in seconds")
+    public void setKeepAliveTime(int keepAliveTimeInSeconds) {
+        threadPool.setKeepAliveTime(keepAliveTimeInSeconds, TimeUnit.SECONDS);
+    }
+
+    @ManagedAttribute(description = "Is shutdown")
+    public boolean isShutdown() {
+        return threadPool.isShutdown();
+    }
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedTracer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedTracer.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedTracer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedTracer.java Fri Mar 19 11:39:01 2010
@@ -30,8 +30,8 @@ import org.springframework.jmx.export.an
 @ManagedResource(description = "Managed Tracer")
 public class ManagedTracer {
 
-    private CamelContext camelContext;
-    private Tracer tracer;
+    private final CamelContext camelContext;
+    private final Tracer tracer;
 
     public ManagedTracer(CamelContext camelContext, Tracer tracer) {
         this.camelContext = camelContext;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/LifecycleStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/LifecycleStrategy.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/LifecycleStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/LifecycleStrategy.java Fri Mar 19 11:39:01 2010
@@ -17,6 +17,7 @@
 package org.apache.camel.spi;
 
 import java.util.Collection;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Component;
@@ -123,4 +124,12 @@ public interface LifecycleStrategy {
      */
     void onErrorHandlerAdd(RouteContext routeContext, Processor errorHandler, ErrorHandlerBuilder errorHandlerBuilder);
 
+    /**
+     * Notification on adding a thread pool.
+     *
+     * @param camelContext  the camel context
+     * @param threadPool    the thread pool
+     */
+    void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool);
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java Fri Mar 19 11:39:01 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.spi;
 
+import java.util.concurrent.ThreadPoolExecutor;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
@@ -56,4 +57,7 @@ public interface ManagementNamingStrateg
     ObjectName getObjectNameForTracer(CamelContext context, InterceptStrategy tracer) throws MalformedObjectNameException;
 
     ObjectName getObjectNameForService(CamelContext context, Service service) throws MalformedObjectNameException;
+
+    ObjectName getObjectNameForThreadPool(CamelContext context, ThreadPoolExecutor threadPool) throws MalformedObjectNameException;
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Fri Mar 19 11:39:01 2010
@@ -22,6 +22,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -191,9 +192,14 @@ public final class ExecutorServiceHelper
         }
 
         BlockingQueue<Runnable> queue;
-        if (maxQueueSize <= 0) {
+        if (corePoolSize == 0 && maxQueueSize <= 0) {
+            // use a synchronous so we can act like the cached thread pool
+            queue = new SynchronousQueue<Runnable>();
+        } else if (maxQueueSize <= 0) {
+            // unbounded task queue
             queue = new LinkedBlockingQueue<Runnable>();
         } else {
+            // bounded task queue
             queue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
         }
         ThreadPoolExecutor answer = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, queue);

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DummyLifecycleStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DummyLifecycleStrategy.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DummyLifecycleStrategy.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DummyLifecycleStrategy.java Fri Mar 19 11:39:01 2010
@@ -19,6 +19,7 @@ package org.apache.camel.impl;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Component;
@@ -85,6 +86,10 @@ public class DummyLifecycleStrategy impl
         events.add("onErrorHandlerAdd");
     }
 
+    public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool) {
+        events.add("onThreadPoolAdd");
+    }
+
     public List<String> getEvents() {
         return events;
     }

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java (from r925107, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java&r1=925107&r2=925181&rev=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java Fri Mar 19 11:39:01 2010
@@ -17,19 +17,17 @@
 package org.apache.camel.management;
 
 import java.util.Set;
-import javax.management.Attribute;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
-import org.apache.camel.LoggingLevel;
 import org.apache.camel.builder.RouteBuilder;
 
 /**
  * @version $Revision$
  */
-public class ManagedTracerOptionsTest extends ContextTestSupport {
+public class ManagedThreadPoolTest extends ContextTestSupport {
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
@@ -40,127 +38,40 @@ public class ManagedTracerOptionsTest ex
         return context;
     }
 
-    public void testManagedErrorHandlerOptions() throws Exception {
+    public void testManagedThreadPool() throws Exception {
         MBeanServer mbeanServer = context.getManagementStrategy().getManagementAgent().getMBeanServer();
 
-        Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=tracer,*"), null);
+        Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=threadpools,*"), null);
         assertEquals(1, set.size());
         ObjectName on = set.iterator().next();
 
-        mbeanServer.setAttribute(on, new Attribute("Enabled", Boolean.TRUE));
-        Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
-        assertEquals(true, enabled.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("DestinationUri", null));
-        String duri = (String) mbeanServer.getAttribute(on, "DestinationUri");
-        assertEquals(null, duri);
-
-        mbeanServer.setAttribute(on, new Attribute("DestinationUri", "mock://traced"));
-        duri = (String) mbeanServer.getAttribute(on, "DestinationUri");
-        assertEquals("mock://traced", duri);
-
-        Boolean useJpa = (Boolean) mbeanServer.getAttribute(on, "UseJpa");
-        assertEquals(false, useJpa.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("LogName", "foo"));
-        String ln = (String) mbeanServer.getAttribute(on, "LogName");
-        assertEquals("foo", ln);
-
-        mbeanServer.setAttribute(on, new Attribute("LogLevel", "WARN"));
-        String ll = (String) mbeanServer.getAttribute(on, "LogLevel");
-        assertEquals(LoggingLevel.WARN.name(), ll);
-
-        mbeanServer.setAttribute(on, new Attribute("LogStackTrace", Boolean.TRUE));
-        Boolean lst = (Boolean) mbeanServer.getAttribute(on, "LogStackTrace");
-        assertEquals(true, lst.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("TraceInterceptors", Boolean.TRUE));
-        Boolean ti = (Boolean) mbeanServer.getAttribute(on, "TraceInterceptors");
-        assertEquals(true, ti.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("TraceExceptions", Boolean.TRUE));
-        Boolean te = (Boolean) mbeanServer.getAttribute(on, "TraceExceptions");
-        assertEquals(true, te.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("TraceOutExchanges", Boolean.TRUE));
-        Boolean toe = (Boolean) mbeanServer.getAttribute(on, "TraceOutExchanges");
-        assertEquals(true, toe.booleanValue());
+        Boolean shutdown = (Boolean) mbeanServer.getAttribute(on, "Shutdown");
+        assertEquals(false, shutdown.booleanValue());
 
-        doAssertFormatter(mbeanServer, on);
+        Integer corePoolSize = (Integer) mbeanServer.getAttribute(on, "CorePoolSize");
+        assertEquals(15, corePoolSize.intValue());
+
+        Integer maxPoolSize = (Integer) mbeanServer.getAttribute(on, "MaximumPoolSize");
+        assertEquals(30, maxPoolSize.intValue());
+
+        Integer poolSize = (Integer) mbeanServer.getAttribute(on, "PoolSize");
+        assertEquals(0, poolSize.intValue());
+
+        Long keepAlive = (Long) mbeanServer.getAttribute(on, "KeepAliveTime");
+        assertEquals(60, keepAlive.intValue());
 
         getMockEndpoint("mock:result").expectedMessageCount(1);
         template.sendBody("direct:start", "Hello World");
         assertMockEndpointsSatisfied();
-    }
 
-    private void doAssertFormatter(MBeanServer mbeanServer, ObjectName on) throws Exception {
-        mbeanServer.setAttribute(on, new Attribute("FormatterShowBody", Boolean.TRUE));
-        Boolean fsb = (Boolean) mbeanServer.getAttribute(on, "FormatterShowBody");
-        assertEquals(true, fsb.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("FormatterShowBodyType", Boolean.TRUE));
-        Boolean fsbt = (Boolean) mbeanServer.getAttribute(on, "FormatterShowBodyType");
-        assertEquals(true, fsbt.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("FormatterShowOutBody", Boolean.TRUE));
-        Boolean fsob = (Boolean) mbeanServer.getAttribute(on, "FormatterShowOutBody");
-        assertEquals(true, fsob.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("FormatterShowOutBodyType", Boolean.TRUE));
-        Boolean fsobt = (Boolean) mbeanServer.getAttribute(on, "FormatterShowOutBodyType");
-        assertEquals(true, fsobt.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("FormatterShowBreadCrumb", Boolean.TRUE));
-        Boolean fsbc = (Boolean) mbeanServer.getAttribute(on, "FormatterShowBreadCrumb");
-        assertEquals(true, fsbc.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("FormatterShowExchangeId", Boolean.TRUE));
-        Boolean fsei = (Boolean) mbeanServer.getAttribute(on, "FormatterShowExchangeId");
-        assertEquals(true, fsei.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("FormatterShowShortExchangeId", Boolean.TRUE));
-        Boolean fssei = (Boolean) mbeanServer.getAttribute(on, "FormatterShowShortExchangeId");
-        assertEquals(true, fssei.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("FormatterShowHeaders", Boolean.TRUE));
-        Boolean fsh = (Boolean) mbeanServer.getAttribute(on, "FormatterShowHeaders");
-        assertEquals(true, fsh.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("FormatterShowOutHeaders", Boolean.TRUE));
-        Boolean fsoh = (Boolean) mbeanServer.getAttribute(on, "FormatterShowOutHeaders");
-        assertEquals(true, fsoh.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("FormatterShowProperties", Boolean.TRUE));
-        Boolean fsp = (Boolean) mbeanServer.getAttribute(on, "FormatterShowProperties");
-        assertEquals(true, fsp.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("FormatterShowNode", Boolean.TRUE));
-        Boolean fsn = (Boolean) mbeanServer.getAttribute(on, "FormatterShowNode");
-        assertEquals(true, fsn.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("FormatterShowRouteId", Boolean.FALSE));
-        Boolean fsr = (Boolean) mbeanServer.getAttribute(on, "FormatterShowRouteId");
-        assertEquals(false, fsr.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("FormatterShowExchangePattern", Boolean.TRUE));
-        Boolean fsep = (Boolean) mbeanServer.getAttribute(on, "FormatterShowExchangePattern");
-        assertEquals(true, fsep.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("FormatterShowException", Boolean.TRUE));
-        Boolean fsex = (Boolean) mbeanServer.getAttribute(on, "FormatterShowException");
-        assertEquals(true, fsex.booleanValue());
-
-        mbeanServer.setAttribute(on, new Attribute("FormatterBreadCrumbLength", 100));
-        Integer fbcl = (Integer) mbeanServer.getAttribute(on, "FormatterBreadCrumbLength");
-        assertEquals(100, fbcl.intValue());
-
-        mbeanServer.setAttribute(on, new Attribute("FormatterNodeLength", 50));
-        Integer fnl = (Integer) mbeanServer.getAttribute(on, "FormatterNodeLength");
-        assertEquals(50, fnl.intValue());
-
-        mbeanServer.setAttribute(on, new Attribute("FormatterMaxChars", 250));
-        Integer fmc = (Integer) mbeanServer.getAttribute(on, "FormatterMaxChars");
-        assertEquals(250, fmc.intValue());
+        poolSize = (Integer) mbeanServer.getAttribute(on, "PoolSize");
+        assertEquals(1, poolSize.intValue());
+
+        Integer largest = (Integer) mbeanServer.getAttribute(on, "LargestPoolSize");
+        assertEquals(1, largest.intValue());
+
+        Long completed = (Long) mbeanServer.getAttribute(on, "CompletedTaskCount");
+        assertEquals(1, completed.intValue());
     }
 
     @Override
@@ -168,7 +79,7 @@ public class ManagedTracerOptionsTest ex
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:start").to("mock:result");
+                from("direct:start").threads(15, 30).to("mock:result");
             }
         };
     }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java Fri Mar 19 11:39:01 2010
@@ -40,7 +40,7 @@ public class ManagedTracerOptionsTest ex
         return context;
     }
 
-    public void testManagedErrorHandlerOptions() throws Exception {
+    public void testManagedTracerOptions() throws Exception {
         MBeanServer mbeanServer = context.getManagementStrategy().getManagementAgent().getMBeanServer();
 
         Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=tracer,*"), null);

Modified: camel/trunk/components/camel-osgi/src/main/java/org/apache/camel/osgi/OsgiServiceRegistry.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-osgi/src/main/java/org/apache/camel/osgi/OsgiServiceRegistry.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/components/camel-osgi/src/main/java/org/apache/camel/osgi/OsgiServiceRegistry.java (original)
+++ camel/trunk/components/camel-osgi/src/main/java/org/apache/camel/osgi/OsgiServiceRegistry.java Fri Mar 19 11:39:01 2010
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Component;
@@ -129,4 +130,9 @@ public class OsgiServiceRegistry impleme
     public void onErrorHandlerAdd(RouteContext routeContext, Processor processor, ErrorHandlerBuilder errorHandlerBuilder) {
         // Do nothing here
     }
+
+    public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool) {
+        // Do nothing here
+    }
+
 }

Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/DummyLifecycleStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/DummyLifecycleStrategy.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/DummyLifecycleStrategy.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/DummyLifecycleStrategy.java Fri Mar 19 11:39:01 2010
@@ -17,6 +17,7 @@
 package org.apache.camel.spring;
 
 import java.util.Collection;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Component;
@@ -70,4 +71,7 @@ public class DummyLifecycleStrategy impl
 
     public void onRoutesAdd(Collection<Route> routes) {
     }
+
+    public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool) {
+    }
 }