You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/04/10 11:54:36 UTC
svn commit: r1466410 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/impl/ test/java/org/apache/camel/impl/
test/java/org/apache/camel/management/
Author: davsclaus
Date: Wed Apr 10 09:54:36 2013
New Revision: 1466410
URL: http://svn.apache.org/r1466410
Log:
CAMEL-6259: Fixed so scheduled thread pools get unregister from jmx.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRemoveRouteAggregateThreadPoolTest.java
- copied, changed from r1466341, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java?rev=1466410&r1=1466409&r2=1466410&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java Wed Apr 10 09:54:36 2013
@@ -323,8 +323,14 @@ public class DefaultExecutorServiceManag
}
}
+ // let lifecycle strategy be notified as well which can let it be managed in JMX as well
+ ThreadPoolExecutor threadPool = null;
if (executorService instanceof ThreadPoolExecutor) {
- ThreadPoolExecutor threadPool = (ThreadPoolExecutor) executorService;
+ threadPool = (ThreadPoolExecutor) executorService;
+ } else if (executorService instanceof SizedScheduledExecutorService) {
+ threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor();
+ }
+ if (threadPool != null) {
for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
lifecycle.onThreadPoolRemove(camelContext, threadPool);
}
@@ -361,8 +367,14 @@ public class DefaultExecutorServiceManag
}
}
+ // let lifecycle strategy be notified as well which can let it be managed in JMX as well
+ ThreadPoolExecutor threadPool = null;
if (executorService instanceof ThreadPoolExecutor) {
- ThreadPoolExecutor threadPool = (ThreadPoolExecutor) executorService;
+ threadPool = (ThreadPoolExecutor) executorService;
+ } else if (executorService instanceof SizedScheduledExecutorService) {
+ threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor();
+ }
+ if (threadPool != null) {
for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
lifecycle.onThreadPoolRemove(camelContext, threadPool);
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java?rev=1466410&r1=1466409&r2=1466410&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java Wed Apr 10 09:54:36 2013
@@ -52,7 +52,7 @@ public class MultipleLifecycleStrategyTe
List<String> expectedEvents = Arrays.asList("onContextStart", "onServiceAdd", "onServiceAdd", "onServiceAdd",
"onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd",
- "onThreadPoolAdd", "onComponentAdd", "onEndpointAdd", "onComponentRemove", "onContextStop");
+ "onThreadPoolAdd", "onComponentAdd", "onEndpointAdd", "onComponentRemove", "onContextStop", "onThreadPoolRemove");
assertEquals(expectedEvents, dummy1.getEvents());
assertEquals(expectedEvents, dummy2.getEvents());
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRemoveRouteAggregateThreadPoolTest.java (from r1466341, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRemoveRouteAggregateThreadPoolTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRemoveRouteAggregateThreadPoolTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java&r1=1466341&r2=1466410&rev=1466410&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRemoveRouteAggregateThreadPoolTest.java Wed Apr 10 09:54:36 2013
@@ -17,54 +17,30 @@
package org.apache.camel.management;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
/**
* @version
*/
-public class ManagedRouteRemoveWireTapExplicitThreadPoolTest extends ManagementTestSupport {
-
- private ExecutorService myThreadPool;
+public class ManagedRemoveRouteAggregateThreadPoolTest extends ManagementTestSupport {
public void testRemove() throws Exception {
MBeanServer mbeanServer = getMBeanServer();
ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"foo\"");
getMockEndpoint("mock:result").expectedMessageCount(1);
- getMockEndpoint("mock:tap").expectedMessageCount(1);
-
- template.sendBody("seda:foo", "Hello World");
-
+ template.sendBody("direct:foo", "Hello World");
assertMockEndpointsSatisfied();
- // should be started
- String state = (String) mbeanServer.getAttribute(on, "State");
- assertEquals("Should be started", ServiceStatus.Started.name(), state);
-
- // and no wire tap thread pool as we use an existing external pool
- Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=threadpools,*"), null);
- boolean wireTap = false;
- for (ObjectName name : set) {
- if (name.toString().contains("wireTap")) {
- wireTap = true;
- break;
- }
- }
- assertFalse("Should not have a wire tap thread pool", wireTap);
+ // remember number of thread pools before
+ Set<ObjectName> before = mbeanServer.queryNames(new ObjectName("*:type=threadpools,*"), null);
- // stop
+ // stop and remove route
mbeanServer.invoke(on, "stop", null, null);
-
- state = (String) mbeanServer.getAttribute(on, "State");
- assertEquals("Should be stopped", ServiceStatus.Stopped.name(), state);
-
- // remove
mbeanServer.invoke(on, "remove", null, null);
// should not be registered anymore
@@ -72,20 +48,10 @@ public class ManagedRouteRemoveWireTapEx
assertFalse("Route mbean should have been unregistered", registered);
// and no wire tap thread pool as we use an existing external pool
- set = mbeanServer.queryNames(new ObjectName("*:type=threadpools,*"), null);
- wireTap = false;
- for (ObjectName name : set) {
- if (name.toString().contains("wireTap")) {
- wireTap = true;
- break;
- }
- }
- assertFalse("Should not have a wire tap thread pool", wireTap);
+ Set<ObjectName> after = mbeanServer.queryNames(new ObjectName("*:type=threadpools,*"), null);
- // should not be shutdown
- assertFalse("Thread pool should not be shutdown", myThreadPool.isShutdown());
-
- myThreadPool.shutdownNow();
+ // there should be 1 less thread pool
+ assertTrue("There should be one less thread pool", before.size() - 1 == after.size());
}
@Override
@@ -93,11 +59,9 @@ public class ManagedRouteRemoveWireTapEx
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- // create a new thread pool to use for wire tap
- myThreadPool = Executors.newFixedThreadPool(1);
-
- from("seda:foo").routeId("foo").wireTap("direct:tap").executorService(myThreadPool).to("mock:result");
- from("direct:tap").routeId("tap").to("mock:tap");
+ from("direct:foo").routeId("foo")
+ .aggregate(constant(true), new UseLatestAggregationStrategy()).completionTimeout(1000)
+ .to("mock:result");
}
};
}