You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/19 12:39:02 UTC
svn commit: r925181 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/management/
camel-core/src/main/java/org/apache/camel/management/mbean/
camel-core/src/main/java/org/apache/camel/spi/ came...
Author: davsclaus
Date: Fri Mar 19 11:39:01 2010
New Revision: 925181
URL: http://svn.apache.org/viewvc?rev=925181&view=rev
Log:
CAMEL-1588: ThreadPools is now managable from JMX.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java
- copied, changed from r925107, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedTracer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/LifecycleStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DummyLifecycleStrategy.java
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java
camel/trunk/components/camel-osgi/src/main/java/org/apache/camel/osgi/OsgiServiceRegistry.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/DummyLifecycleStrategy.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java Fri Mar 19 11:39:01 2010
@@ -54,8 +54,8 @@ public class DefaultConsumerTemplate imp
return consumerCache.receive(endpoint);
}
- public Exchange receive(Endpoint endpoinit) {
- return receive(endpoinit.getEndpointUri());
+ public Exchange receive(Endpoint endpoint) {
+ return receive(endpoint.getEndpointUri());
}
public Exchange receive(String endpointUri, long timeout) {
@@ -137,7 +137,7 @@ public class DefaultConsumerTemplate imp
/**
* Extracts the body from the given result.
* <p/>
- * If the exchange pattern is provided it will try to honor it and retrive the body
+ * If the exchange pattern is provided it will try to honor it and retrieve the body
* from either IN or OUT according to the pattern.
*
* @param result the result
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Fri Mar 19 11:39:01 2010
@@ -21,10 +21,12 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.spi.ExecutorServiceStrategy;
+import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
@@ -91,8 +93,7 @@ public class DefaultExecutorServiceStrat
public ExecutorService newCachedThreadPool(Object source, String name) {
ExecutorService answer = ExecutorServiceHelper.newCachedThreadPool(threadNamePattern, name, true);
- executorServices.add(answer);
- onNewExecutorService(answer);
+ onThreadPoolCreated(answer);
if (LOG.isDebugEnabled()) {
LOG.debug("Created new cached thread pool for source: " + source + " with name: " + name + ". -> " + answer);
@@ -102,8 +103,7 @@ public class DefaultExecutorServiceStrat
public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) {
ScheduledExecutorService answer = ExecutorServiceHelper.newScheduledThreadPool(poolSize, threadNamePattern, name, true);
- executorServices.add(answer);
- onNewExecutorService(answer);
+ onThreadPoolCreated(answer);
if (LOG.isDebugEnabled()) {
LOG.debug("Created new scheduled thread pool for source: " + source + " with name: " + name + ". [poolSize=" + poolSize + "]. -> " + answer);
@@ -113,8 +113,7 @@ public class DefaultExecutorServiceStrat
public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) {
ExecutorService answer = ExecutorServiceHelper.newFixedThreadPool(poolSize, threadNamePattern, name, true);
- executorServices.add(answer);
- onNewExecutorService(answer);
+ onThreadPoolCreated(answer);
if (LOG.isDebugEnabled()) {
LOG.debug("Created new fixed thread pool for source: " + source + " with name: " + name + ". [poolSize=" + poolSize + "]. -> " + answer);
@@ -124,8 +123,7 @@ public class DefaultExecutorServiceStrat
public ExecutorService newSingleThreadExecutor(Object source, String name) {
ExecutorService answer = ExecutorServiceHelper.newSingleThreadExecutor(threadNamePattern, name, true);
- executorServices.add(answer);
- onNewExecutorService(answer);
+ onThreadPoolCreated(answer);
if (LOG.isDebugEnabled()) {
LOG.debug("Created new single thread pool for source: " + source + " with name: " + name + ". -> " + answer);
@@ -135,8 +133,7 @@ public class DefaultExecutorServiceStrat
public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize) {
ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize);
- executorServices.add(answer);
- onNewExecutorService(answer);
+ onThreadPoolCreated(answer);
if (LOG.isDebugEnabled()) {
LOG.debug("Created new thread pool for source: " + source + " with name: " + name + ". [poolSize=" + corePoolSize
@@ -150,8 +147,7 @@ public class DefaultExecutorServiceStrat
boolean daemon) {
ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize, keepAliveTime,
timeUnit, maxQueueSize, rejectedExecutionHandler, daemon);
- executorServices.add(answer);
- onNewExecutorService(answer);
+ onThreadPoolCreated(answer);
if (LOG.isDebugEnabled()) {
LOG.debug("Created new thread pool for source: " + source + " with name: " + name + ". [poolSize=" + corePoolSize
@@ -192,6 +188,22 @@ public class DefaultExecutorServiceStrat
return answer;
}
+ private void onThreadPoolCreated(ExecutorService executorService) {
+ // add to internal list of thread pools
+ executorServices.add(executorService);
+
+ // let lifecycle strategy be notified as well which can let it be managed in JMX as well
+ if (executorService instanceof ThreadPoolExecutor) {
+ ThreadPoolExecutor threadPool = (ThreadPoolExecutor) executorService;
+ for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
+ lifecycle.onThreadPoolAdd(camelContext, threadPool);
+ }
+ }
+
+ // now call strategy to allow custom logic
+ onNewExecutorService(executorService);
+ }
+
/**
* Strategy callback when a new {@link java.util.concurrent.ExecutorService} have been created.
*
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java Fri Mar 19 11:39:01 2010
@@ -20,6 +20,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadPoolExecutor;
import javax.management.JMException;
import org.apache.camel.CamelContext;
@@ -50,6 +51,7 @@ import org.apache.camel.management.mbean
import org.apache.camel.management.mbean.ManagedScheduledPollConsumer;
import org.apache.camel.management.mbean.ManagedSendProcessor;
import org.apache.camel.management.mbean.ManagedService;
+import org.apache.camel.management.mbean.ManagedThreadPool;
import org.apache.camel.management.mbean.ManagedThrottler;
import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy;
import org.apache.camel.management.mbean.ManagedTracer;
@@ -434,7 +436,31 @@ public class DefaultManagementLifecycleS
try {
getManagementStrategy().manageObject(me);
} catch (Exception e) {
- LOG.warn("Could not register error handler builder: " + errorHandlerBuilder + " as ErrorHandlerMBean.", e);
+ LOG.warn("Could not register error handler builder: " + errorHandlerBuilder + " as ErrorHandler MBean.", e);
+ }
+ }
+
+ public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool) {
+ // the agent hasn't been started
+ if (!initialized) {
+ return;
+ }
+
+ ManagedThreadPool mtp = new ManagedThreadPool(camelContext, threadPool);
+ mtp.init(getManagementStrategy());
+
+ // skip already managed services, for example if a route has been restarted
+ if (getManagementStrategy().isManaged(mtp, null)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("The thread pool is already managed: " + threadPool);
+ }
+ return;
+ }
+
+ try {
+ getManagementStrategy().manageObject(mtp);
+ } catch (Exception e) {
+ LOG.warn("Could not register thread pool: " + threadPool + " as ThreadPool MBean.", e);
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java Fri Mar 19 11:39:01 2010
@@ -18,6 +18,7 @@ package org.apache.camel.management;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.concurrent.ThreadPoolExecutor;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
@@ -54,6 +55,7 @@ public class DefaultManagementNamingStra
public static final String TYPE_COMPONENT = "components";
public static final String TYPE_TRACER = "tracer";
public static final String TYPE_ERRORHANDLER = "errorhandlers";
+ public static final String TYPE_THREAD_POOL = "threadpools";
public static final String TYPE_SERVICE = "services";
protected String domainName;
@@ -228,6 +230,17 @@ public class DefaultManagementNamingStra
return createObjectName(buffer);
}
+ public ObjectName getObjectNameForThreadPool(CamelContext context, ThreadPoolExecutor threadPool) throws MalformedObjectNameException {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(domainName).append(":");
+ buffer.append(KEY_CONTEXT + "=").append(getContextId(context)).append(",");
+ buffer.append(KEY_TYPE + "=" + TYPE_THREAD_POOL + ",");
+ buffer.append(KEY_NAME + "=")
+ .append(threadPool.getClass().getSimpleName())
+ .append("(").append(ObjectHelper.getIdentityHashCode(threadPool)).append(")");
+ return createObjectName(buffer);
+ }
+
public String getDomainName() {
return domainName;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java Fri Mar 19 11:39:01 2010
@@ -29,6 +29,7 @@ import org.apache.camel.management.mbean
import org.apache.camel.management.mbean.ManagedProducer;
import org.apache.camel.management.mbean.ManagedRoute;
import org.apache.camel.management.mbean.ManagedService;
+import org.apache.camel.management.mbean.ManagedThreadPool;
import org.apache.camel.management.mbean.ManagedTracer;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.spi.ManagementAgent;
@@ -110,6 +111,9 @@ public class ManagedManagementStrategy e
} else if (managedObject instanceof ManagedTracer) {
ManagedTracer mt = (ManagedTracer) managedObject;
objectName = getManagementNamingStrategy().getObjectNameForTracer(mt.getCamelContext(), mt.getTracer());
+ } else if (managedObject instanceof ManagedThreadPool) {
+ ManagedThreadPool mes = (ManagedThreadPool) managedObject;
+ objectName = getManagementNamingStrategy().getObjectNameForThreadPool(mes.getCamelContext(), mes.getThreadPool());
} else if (managedObject instanceof ManagedService) {
// check for managed service should be last
ManagedService ms = (ManagedService) managedObject;
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java?rev=925181&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java Fri Mar 19 11:39:01 2010
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.ManagementStrategy;
+import org.springframework.jmx.export.annotation.ManagedAttribute;
+import org.springframework.jmx.export.annotation.ManagedResource;
+
+/**
+ * @version $Revision$
+ */
+@ManagedResource(description = "Managed ThreadPool")
+public class ManagedThreadPool {
+
+ private final CamelContext camelContext;
+ private final ThreadPoolExecutor threadPool;
+
+ public ManagedThreadPool(CamelContext camelContext, ThreadPoolExecutor threadPool) {
+ this.camelContext = camelContext;
+ this.threadPool = threadPool;
+ }
+
+ public void init(ManagementStrategy strategy) {
+ // do nothing
+ }
+
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ public ThreadPoolExecutor getThreadPool() {
+ return threadPool;
+ }
+
+ @ManagedAttribute(description = "Core pool size")
+ public int getCorePoolSize() {
+ return threadPool.getCorePoolSize();
+ }
+
+ @ManagedAttribute(description = "Core pool size")
+ public void setCorePoolSize(int corePoolSize) {
+ threadPool.setCorePoolSize(corePoolSize);
+ }
+
+ @ManagedAttribute(description = "Pool size")
+ public int getPoolSize() {
+ return threadPool.getPoolSize();
+ }
+
+ @ManagedAttribute(description = "Maximum pool size")
+ public int getMaximumPoolSize() {
+ return threadPool.getMaximumPoolSize();
+ }
+
+ @ManagedAttribute(description = "Maximum pool size")
+ public void setMaximumPoolSize(int maximumPoolSize) {
+ threadPool.setMaximumPoolSize(maximumPoolSize);
+ }
+
+ @ManagedAttribute(description = "Largest pool size")
+ public int getLargestPoolSize() {
+ return threadPool.getLargestPoolSize();
+ }
+
+ @ManagedAttribute(description = "Active count")
+ public int getActiveCount() {
+ return threadPool.getActiveCount();
+ }
+
+ @ManagedAttribute(description = "Task count")
+ public long getTaskCount() {
+ return threadPool.getTaskCount();
+ }
+
+ @ManagedAttribute(description = "Completed task count")
+ public long getCompletedTaskCount() {
+ return threadPool.getCompletedTaskCount();
+ }
+
+ @ManagedAttribute(description = "Keep alive time in seconds")
+ public long getKeepAliveTime() {
+ return threadPool.getKeepAliveTime(TimeUnit.SECONDS);
+ }
+
+ @ManagedAttribute(description = "Keep alive time in seconds")
+ public void setKeepAliveTime(int keepAliveTimeInSeconds) {
+ threadPool.setKeepAliveTime(keepAliveTimeInSeconds, TimeUnit.SECONDS);
+ }
+
+ @ManagedAttribute(description = "Is shutdown")
+ public boolean isShutdown() {
+ return threadPool.isShutdown();
+ }
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedTracer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedTracer.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedTracer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedTracer.java Fri Mar 19 11:39:01 2010
@@ -30,8 +30,8 @@ import org.springframework.jmx.export.an
@ManagedResource(description = "Managed Tracer")
public class ManagedTracer {
- private CamelContext camelContext;
- private Tracer tracer;
+ private final CamelContext camelContext;
+ private final Tracer tracer;
public ManagedTracer(CamelContext camelContext, Tracer tracer) {
this.camelContext = camelContext;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/LifecycleStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/LifecycleStrategy.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/LifecycleStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/LifecycleStrategy.java Fri Mar 19 11:39:01 2010
@@ -17,6 +17,7 @@
package org.apache.camel.spi;
import java.util.Collection;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.camel.CamelContext;
import org.apache.camel.Component;
@@ -123,4 +124,12 @@ public interface LifecycleStrategy {
*/
void onErrorHandlerAdd(RouteContext routeContext, Processor errorHandler, ErrorHandlerBuilder errorHandlerBuilder);
+ /**
+ * Notification on adding a thread pool.
+ *
+ * @param camelContext the camel context
+ * @param threadPool the thread pool
+ */
+ void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool);
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java Fri Mar 19 11:39:01 2010
@@ -16,6 +16,7 @@
*/
package org.apache.camel.spi;
+import java.util.concurrent.ThreadPoolExecutor;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
@@ -56,4 +57,7 @@ public interface ManagementNamingStrateg
ObjectName getObjectNameForTracer(CamelContext context, InterceptStrategy tracer) throws MalformedObjectNameException;
ObjectName getObjectNameForService(CamelContext context, Service service) throws MalformedObjectNameException;
+
+ ObjectName getObjectNameForThreadPool(CamelContext context, ThreadPoolExecutor threadPool) throws MalformedObjectNameException;
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Fri Mar 19 11:39:01 2010
@@ -22,6 +22,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -191,9 +192,14 @@ public final class ExecutorServiceHelper
}
BlockingQueue<Runnable> queue;
- if (maxQueueSize <= 0) {
+ if (corePoolSize == 0 && maxQueueSize <= 0) {
+ // use a synchronous so we can act like the cached thread pool
+ queue = new SynchronousQueue<Runnable>();
+ } else if (maxQueueSize <= 0) {
+ // unbounded task queue
queue = new LinkedBlockingQueue<Runnable>();
} else {
+ // bounded task queue
queue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
}
ThreadPoolExecutor answer = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, queue);
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DummyLifecycleStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DummyLifecycleStrategy.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DummyLifecycleStrategy.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DummyLifecycleStrategy.java Fri Mar 19 11:39:01 2010
@@ -19,6 +19,7 @@ package org.apache.camel.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.camel.CamelContext;
import org.apache.camel.Component;
@@ -85,6 +86,10 @@ public class DummyLifecycleStrategy impl
events.add("onErrorHandlerAdd");
}
+ public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool) {
+ events.add("onThreadPoolAdd");
+ }
+
public List<String> getEvents() {
return events;
}
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java (from r925107, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java&r1=925107&r2=925181&rev=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java Fri Mar 19 11:39:01 2010
@@ -17,19 +17,17 @@
package org.apache.camel.management;
import java.util.Set;
-import javax.management.Attribute;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
-import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
/**
* @version $Revision$
*/
-public class ManagedTracerOptionsTest extends ContextTestSupport {
+public class ManagedThreadPoolTest extends ContextTestSupport {
@Override
protected CamelContext createCamelContext() throws Exception {
@@ -40,127 +38,40 @@ public class ManagedTracerOptionsTest ex
return context;
}
- public void testManagedErrorHandlerOptions() throws Exception {
+ public void testManagedThreadPool() throws Exception {
MBeanServer mbeanServer = context.getManagementStrategy().getManagementAgent().getMBeanServer();
- Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=tracer,*"), null);
+ Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=threadpools,*"), null);
assertEquals(1, set.size());
ObjectName on = set.iterator().next();
- mbeanServer.setAttribute(on, new Attribute("Enabled", Boolean.TRUE));
- Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
- assertEquals(true, enabled.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("DestinationUri", null));
- String duri = (String) mbeanServer.getAttribute(on, "DestinationUri");
- assertEquals(null, duri);
-
- mbeanServer.setAttribute(on, new Attribute("DestinationUri", "mock://traced"));
- duri = (String) mbeanServer.getAttribute(on, "DestinationUri");
- assertEquals("mock://traced", duri);
-
- Boolean useJpa = (Boolean) mbeanServer.getAttribute(on, "UseJpa");
- assertEquals(false, useJpa.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("LogName", "foo"));
- String ln = (String) mbeanServer.getAttribute(on, "LogName");
- assertEquals("foo", ln);
-
- mbeanServer.setAttribute(on, new Attribute("LogLevel", "WARN"));
- String ll = (String) mbeanServer.getAttribute(on, "LogLevel");
- assertEquals(LoggingLevel.WARN.name(), ll);
-
- mbeanServer.setAttribute(on, new Attribute("LogStackTrace", Boolean.TRUE));
- Boolean lst = (Boolean) mbeanServer.getAttribute(on, "LogStackTrace");
- assertEquals(true, lst.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("TraceInterceptors", Boolean.TRUE));
- Boolean ti = (Boolean) mbeanServer.getAttribute(on, "TraceInterceptors");
- assertEquals(true, ti.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("TraceExceptions", Boolean.TRUE));
- Boolean te = (Boolean) mbeanServer.getAttribute(on, "TraceExceptions");
- assertEquals(true, te.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("TraceOutExchanges", Boolean.TRUE));
- Boolean toe = (Boolean) mbeanServer.getAttribute(on, "TraceOutExchanges");
- assertEquals(true, toe.booleanValue());
+ Boolean shutdown = (Boolean) mbeanServer.getAttribute(on, "Shutdown");
+ assertEquals(false, shutdown.booleanValue());
- doAssertFormatter(mbeanServer, on);
+ Integer corePoolSize = (Integer) mbeanServer.getAttribute(on, "CorePoolSize");
+ assertEquals(15, corePoolSize.intValue());
+
+ Integer maxPoolSize = (Integer) mbeanServer.getAttribute(on, "MaximumPoolSize");
+ assertEquals(30, maxPoolSize.intValue());
+
+ Integer poolSize = (Integer) mbeanServer.getAttribute(on, "PoolSize");
+ assertEquals(0, poolSize.intValue());
+
+ Long keepAlive = (Long) mbeanServer.getAttribute(on, "KeepAliveTime");
+ assertEquals(60, keepAlive.intValue());
getMockEndpoint("mock:result").expectedMessageCount(1);
template.sendBody("direct:start", "Hello World");
assertMockEndpointsSatisfied();
- }
- private void doAssertFormatter(MBeanServer mbeanServer, ObjectName on) throws Exception {
- mbeanServer.setAttribute(on, new Attribute("FormatterShowBody", Boolean.TRUE));
- Boolean fsb = (Boolean) mbeanServer.getAttribute(on, "FormatterShowBody");
- assertEquals(true, fsb.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("FormatterShowBodyType", Boolean.TRUE));
- Boolean fsbt = (Boolean) mbeanServer.getAttribute(on, "FormatterShowBodyType");
- assertEquals(true, fsbt.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("FormatterShowOutBody", Boolean.TRUE));
- Boolean fsob = (Boolean) mbeanServer.getAttribute(on, "FormatterShowOutBody");
- assertEquals(true, fsob.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("FormatterShowOutBodyType", Boolean.TRUE));
- Boolean fsobt = (Boolean) mbeanServer.getAttribute(on, "FormatterShowOutBodyType");
- assertEquals(true, fsobt.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("FormatterShowBreadCrumb", Boolean.TRUE));
- Boolean fsbc = (Boolean) mbeanServer.getAttribute(on, "FormatterShowBreadCrumb");
- assertEquals(true, fsbc.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("FormatterShowExchangeId", Boolean.TRUE));
- Boolean fsei = (Boolean) mbeanServer.getAttribute(on, "FormatterShowExchangeId");
- assertEquals(true, fsei.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("FormatterShowShortExchangeId", Boolean.TRUE));
- Boolean fssei = (Boolean) mbeanServer.getAttribute(on, "FormatterShowShortExchangeId");
- assertEquals(true, fssei.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("FormatterShowHeaders", Boolean.TRUE));
- Boolean fsh = (Boolean) mbeanServer.getAttribute(on, "FormatterShowHeaders");
- assertEquals(true, fsh.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("FormatterShowOutHeaders", Boolean.TRUE));
- Boolean fsoh = (Boolean) mbeanServer.getAttribute(on, "FormatterShowOutHeaders");
- assertEquals(true, fsoh.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("FormatterShowProperties", Boolean.TRUE));
- Boolean fsp = (Boolean) mbeanServer.getAttribute(on, "FormatterShowProperties");
- assertEquals(true, fsp.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("FormatterShowNode", Boolean.TRUE));
- Boolean fsn = (Boolean) mbeanServer.getAttribute(on, "FormatterShowNode");
- assertEquals(true, fsn.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("FormatterShowRouteId", Boolean.FALSE));
- Boolean fsr = (Boolean) mbeanServer.getAttribute(on, "FormatterShowRouteId");
- assertEquals(false, fsr.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("FormatterShowExchangePattern", Boolean.TRUE));
- Boolean fsep = (Boolean) mbeanServer.getAttribute(on, "FormatterShowExchangePattern");
- assertEquals(true, fsep.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("FormatterShowException", Boolean.TRUE));
- Boolean fsex = (Boolean) mbeanServer.getAttribute(on, "FormatterShowException");
- assertEquals(true, fsex.booleanValue());
-
- mbeanServer.setAttribute(on, new Attribute("FormatterBreadCrumbLength", 100));
- Integer fbcl = (Integer) mbeanServer.getAttribute(on, "FormatterBreadCrumbLength");
- assertEquals(100, fbcl.intValue());
-
- mbeanServer.setAttribute(on, new Attribute("FormatterNodeLength", 50));
- Integer fnl = (Integer) mbeanServer.getAttribute(on, "FormatterNodeLength");
- assertEquals(50, fnl.intValue());
-
- mbeanServer.setAttribute(on, new Attribute("FormatterMaxChars", 250));
- Integer fmc = (Integer) mbeanServer.getAttribute(on, "FormatterMaxChars");
- assertEquals(250, fmc.intValue());
+ poolSize = (Integer) mbeanServer.getAttribute(on, "PoolSize");
+ assertEquals(1, poolSize.intValue());
+
+ Integer largest = (Integer) mbeanServer.getAttribute(on, "LargestPoolSize");
+ assertEquals(1, largest.intValue());
+
+ Long completed = (Long) mbeanServer.getAttribute(on, "CompletedTaskCount");
+ assertEquals(1, completed.intValue());
}
@Override
@@ -168,7 +79,7 @@ public class ManagedTracerOptionsTest ex
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:start").to("mock:result");
+ from("direct:start").threads(15, 30).to("mock:result");
}
};
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java Fri Mar 19 11:39:01 2010
@@ -40,7 +40,7 @@ public class ManagedTracerOptionsTest ex
return context;
}
- public void testManagedErrorHandlerOptions() throws Exception {
+ public void testManagedTracerOptions() throws Exception {
MBeanServer mbeanServer = context.getManagementStrategy().getManagementAgent().getMBeanServer();
Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=tracer,*"), null);
Modified: camel/trunk/components/camel-osgi/src/main/java/org/apache/camel/osgi/OsgiServiceRegistry.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-osgi/src/main/java/org/apache/camel/osgi/OsgiServiceRegistry.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/components/camel-osgi/src/main/java/org/apache/camel/osgi/OsgiServiceRegistry.java (original)
+++ camel/trunk/components/camel-osgi/src/main/java/org/apache/camel/osgi/OsgiServiceRegistry.java Fri Mar 19 11:39:01 2010
@@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.camel.CamelContext;
import org.apache.camel.Component;
@@ -129,4 +130,9 @@ public class OsgiServiceRegistry impleme
public void onErrorHandlerAdd(RouteContext routeContext, Processor processor, ErrorHandlerBuilder errorHandlerBuilder) {
// Do nothing here
}
+
+ public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool) {
+ // Do nothing here
+ }
+
}
Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/DummyLifecycleStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/DummyLifecycleStrategy.java?rev=925181&r1=925180&r2=925181&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/DummyLifecycleStrategy.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/DummyLifecycleStrategy.java Fri Mar 19 11:39:01 2010
@@ -17,6 +17,7 @@
package org.apache.camel.spring;
import java.util.Collection;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.camel.CamelContext;
import org.apache.camel.Component;
@@ -70,4 +71,7 @@ public class DummyLifecycleStrategy impl
public void onRoutesAdd(Collection<Route> routes) {
}
+
+ public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool) {
+ }
}