You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/16 07:00:41 UTC
svn commit: r923588 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/impl/ main/java/org/apache/camel/management/
main/java/org/apache/camel/processor/
main/java/org/apache/camel/processor/aggregate/
main/java/org/apache/camel/spi/ test/jav...
Author: davsclaus
Date: Tue Mar 16 06:00:41 2010
New Revision: 923588
URL: http://svn.apache.org/viewvc?rev=923588&view=rev
Log:
CAMEL-1588: Let ExecutorServiceStrategy handle shutting down thread pools on shutdown, which frees the burden from the EIPs. Improved shutdown a bit to shutdown services more later.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Tue Mar 16 06:00:41 2010
@@ -1158,10 +1158,12 @@ public class DefaultCamelContext extends
EventHelper.notifyCamelContextStarting(this);
forceLazyInitialization();
- startServices(components.values());
+ addService(executorServiceStrategy);
addService(inflightRepository);
addService(shutdownStrategy);
+ startServices(components.values());
+
// To avoid initiating the routeDefinitions after stopping the camel context
if (!routeDefinitionInitiated) {
startRouteDefinitions(routeDefinitions);
@@ -1184,9 +1186,6 @@ public class DefaultCamelContext extends
// the stop order is important
- shutdownServices(servicesToClose);
- servicesToClose.clear();
-
shutdownServices(endpoints.values());
endpoints.clear();
@@ -1199,7 +1198,9 @@ public class DefaultCamelContext extends
} else {
shutdownServices(producerServicePool);
}
- shutdownServices(inflightRepository);
+
+ shutdownServices(servicesToClose);
+ servicesToClose.clear();
try {
for (LifecycleStrategy strategy : lifecycleStrategies) {
@@ -1213,7 +1214,7 @@ public class DefaultCamelContext extends
EventHelper.notifyCamelContextStopped(this);
// shutdown management as the last one
- shutdownServices(getManagementStrategy());
+ shutdownServices(managementStrategy);
LOG.info("Apache Camel " + getVersion() + " (CamelContext:" + getName() + ") is shutdown");
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Tue Mar 16 06:00:41 2010
@@ -16,6 +16,7 @@
*/
package org.apache.camel.impl;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@@ -24,12 +25,16 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.spi.ExecutorServiceStrategy;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* @version $Revision$
*/
-public class DefaultExecutorServiceStrategy implements ExecutorServiceStrategy {
+public class DefaultExecutorServiceStrategy extends ServiceSupport implements ExecutorServiceStrategy {
+ private static final Log LOG = LogFactory.getLog(DefaultExecutorServiceStrategy.class);
+ private final List<ExecutorService> executorServices = new ArrayList<ExecutorService>();
private final CamelContext camelContext;
private String threadNamePattern = "Camel Thread ${counter} - ${name}";
@@ -50,39 +55,103 @@ public class DefaultExecutorServiceStrat
}
public ExecutorService lookup(Object source, String executorServiceRef) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Looking up ExecutorService with ref: " + executorServiceRef);
+ }
return camelContext.getRegistry().lookup(executorServiceRef, ExecutorService.class);
}
public ExecutorService newCachedThreadPool(Object source, String name) {
- return ExecutorServiceHelper.newCachedThreadPool(threadNamePattern, name, true);
+ ExecutorService answer = ExecutorServiceHelper.newCachedThreadPool(threadNamePattern, name, true);
+ onNewExecutorService(answer);
+ return answer;
}
public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) {
- return ExecutorServiceHelper.newScheduledThreadPool(poolSize, threadNamePattern, name, true);
+ ScheduledExecutorService answer = ExecutorServiceHelper.newScheduledThreadPool(poolSize, threadNamePattern, name, true);
+ onNewExecutorService(answer);
+ return answer;
}
public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) {
- return ExecutorServiceHelper.newFixedThreadPool(poolSize, threadNamePattern, name, true);
+ ExecutorService answer = ExecutorServiceHelper.newFixedThreadPool(poolSize, threadNamePattern, name, true);
+ onNewExecutorService(answer);
+ return answer;
}
public ExecutorService newSingleThreadExecutor(Object source, String name) {
- return ExecutorServiceHelper.newSingleThreadExecutor(threadNamePattern, name, true);
+ ExecutorService answer = ExecutorServiceHelper.newSingleThreadExecutor(threadNamePattern, name, true);
+ onNewExecutorService(answer);
+ return answer;
}
public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize) {
- return ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize);
+ ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize);
+ onNewExecutorService(answer);
+ return answer;
}
public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, boolean daemon) {
- return ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, daemon);
+ ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, daemon);
+ onNewExecutorService(answer);
+ return answer;
}
public void shutdown(ExecutorService executorService) {
+ if (executorService.isShutdown()) {
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Shutting down ExecutorService: " + executorService);
+ }
executorService.shutdown();
}
public List<Runnable> shutdownNow(ExecutorService executorService) {
+ if (executorService.isShutdown()) {
+ return null;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Shutting down now ExecutorService: " + executorService);
+ }
return executorService.shutdownNow();
}
-
+
+ /**
+ * Callback when a new {@link java.util.concurrent.ExecutorService} have been created.
+ *
+ * @param executorService the created {@link java.util.concurrent.ExecutorService}
+ */
+ protected void onNewExecutorService(ExecutorService executorService) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created new ExecutorService: " + executorService);
+ }
+ executorServices.add(executorService);
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ // noop
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ // noop
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ // shutdown all executor services
+ for (ExecutorService executorService : executorServices) {
+ // only log if something goes wrong as we want to shutdown them all
+ try {
+ shutdownNow(executorService);
+ } catch (Exception e) {
+ LOG.warn("Error occurred during shutdown of ExecutorService: "
+ + executorService + ". This exception will be ignored.", e);
+ }
+ }
+ executorServices.clear();
+ }
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java Tue Mar 16 06:00:41 2010
@@ -263,7 +263,8 @@ public class DefaultManagementAgent exte
try {
cs.stop();
} catch (IOException e) {
- // ignore
+ LOG.debug("Error occurred during stopping JMXConnectorService: "
+ + cs + ". This exception will be ignored.");
}
cs = null;
}
@@ -276,11 +277,11 @@ public class DefaultManagementAgent exte
ObjectName[] mBeans = mbeansRegistered.toArray(new ObjectName[mbeansRegistered.size()]);
int caught = 0;
for (ObjectName name : mBeans) {
- mbeansRegistered.remove(name);
try {
+ mbeansRegistered.remove(name);
unregister(name);
- } catch (JMException jmex) {
- LOG.info("Exception unregistering MBean", jmex);
+ } catch (Exception e) {
+ LOG.info("Exception unregistering MBean with name " + name, e);
caught++;
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Tue Mar 16 06:00:41 2010
@@ -363,14 +363,6 @@ public class MulticastProcessor extends
ServiceHelper.stopServices(processors);
}
- @Override
- protected void doShutdown() throws Exception {
- // only shutdown thread pool on shutdown
- if (executorService != null) {
- camelContext.getExecutorServiceStrategy().shutdownNow(executorService);
- }
- }
-
private static void setToEndpoint(Exchange exchange, Processor processor) {
if (processor instanceof Producer) {
Producer producer = (Producer) processor;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java Tue Mar 16 06:00:41 2010
@@ -69,14 +69,6 @@ public class OnCompletionProcessor exten
ServiceHelper.stopService(processor);
}
- @Override
- protected void doShutdown() throws Exception {
- // only shutdown thread pool on shutdown
- if (executorService != null) {
- camelContext.getExecutorServiceStrategy().shutdownNow(executorService);
- }
- }
-
public void process(Exchange exchange) throws Exception {
if (processor == null) {
return;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Tue Mar 16 06:00:41 2010
@@ -101,15 +101,6 @@ public class ThreadsProcessor extends De
};
}
- @Override
- protected void doShutdown() throws Exception {
- super.doShutdown();
- // only shutdown thread pool on shutdown
- if (executorService != null) {
- camelContext.getExecutorServiceStrategy().shutdownNow(executorService);
- }
- }
-
public String toString() {
return "Threads";
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Tue Mar 16 06:00:41 2010
@@ -61,15 +61,6 @@ public class WireTapProcessor extends Se
}
@Override
- protected void doShutdown() throws Exception {
- super.doShutdown();
- // only shutdown thread pool on shutdown
- if (executorService != null) {
- camelContext.getExecutorServiceStrategy().shutdownNow(executorService);
- }
- }
-
- @Override
public String toString() {
return "WireTap[" + destination.getEndpointUri() + "]";
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Tue Mar 16 06:00:41 2010
@@ -487,10 +487,4 @@ public class AggregateProcessor extends
}
}
- @Override
- protected void doShutdown() throws Exception {
- // only shutdown thread pool when we are shutting down
- camelContext.getExecutorServiceStrategy().shutdownNow(executorService);
- }
-
-}
+}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Tue Mar 16 06:00:41 2010
@@ -21,6 +21,8 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.ShutdownableService;
+
/**
* Strategy to create thread pools.
* <p/>
@@ -34,7 +36,7 @@ import java.util.concurrent.TimeUnit;
*
* @version $Revision$
*/
-public interface ExecutorServiceStrategy {
+public interface ExecutorServiceStrategy extends ShutdownableService {
/**
* Creates a full thread name
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java Tue Mar 16 06:00:41 2010
@@ -50,8 +50,8 @@ public class MultipleLifecycleStrategyTe
context.stop();
- assertEquals(8, dummy1.getEvents().size());
- assertEquals(8, dummy2.getEvents().size());
+ assertEquals(9, dummy1.getEvents().size());
+ assertEquals(9, dummy2.getEvents().size());
assertEquals("onContextStart", dummy1.getEvents().get(0));
assertEquals("onContextStart", dummy2.getEvents().get(0));
@@ -61,14 +61,16 @@ public class MultipleLifecycleStrategyTe
assertEquals("onServiceAdd", dummy2.getEvents().get(2));
assertEquals("onServiceAdd", dummy1.getEvents().get(3));
assertEquals("onServiceAdd", dummy2.getEvents().get(3));
- assertEquals("onComponentAdd", dummy1.getEvents().get(4));
- assertEquals("onComponentAdd", dummy2.getEvents().get(4));
- assertEquals("onEndpointAdd", dummy1.getEvents().get(5));
- assertEquals("onEndpointAdd", dummy2.getEvents().get(5));
- assertEquals("onComponentRemove", dummy1.getEvents().get(6));
- assertEquals("onComponentRemove", dummy2.getEvents().get(6));
- assertEquals("onContextStop", dummy1.getEvents().get(7));
- assertEquals("onContextStop", dummy2.getEvents().get(7));
+ assertEquals("onServiceAdd", dummy1.getEvents().get(4));
+ assertEquals("onServiceAdd", dummy2.getEvents().get(4));
+ assertEquals("onComponentAdd", dummy1.getEvents().get(5));
+ assertEquals("onComponentAdd", dummy2.getEvents().get(5));
+ assertEquals("onEndpointAdd", dummy1.getEvents().get(6));
+ assertEquals("onEndpointAdd", dummy2.getEvents().get(6));
+ assertEquals("onComponentRemove", dummy1.getEvents().get(7));
+ assertEquals("onComponentRemove", dummy2.getEvents().get(7));
+ assertEquals("onContextStop", dummy1.getEvents().get(8));
+ assertEquals("onContextStop", dummy2.getEvents().get(8));
}
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java Tue Mar 16 06:00:41 2010
@@ -21,14 +21,13 @@ import java.util.concurrent.ExecutorServ
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.BodyInAggregatingStrategy;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* @version $Revision$
*/
public class AggregateShutdownThreadPoolTest extends ContextTestSupport {
- private ExecutorService myPool = ExecutorServiceHelper.newCachedThreadPool(null, "myPool", true);
+ private ExecutorService myPool;
public void testAggregateShutdownDefaultThreadPoolTest() throws Exception {
getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C");
@@ -95,6 +94,8 @@ public class AggregateShutdownThreadPool
return new RouteBuilder() {
@Override
public void configure() throws Exception {
+ myPool = context.getExecutorServiceStrategy().newCachedThreadPool(this, "myPool");
+
from("direct:foo").routeId("foo")
.aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(3)
.to("mock:aggregated");