You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/16 07:00:41 UTC

svn commit: r923588 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ main/java/org/apache/camel/management/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/processor/aggregate/ main/java/org/apache/camel/spi/ test/jav...

Author: davsclaus
Date: Tue Mar 16 06:00:41 2010
New Revision: 923588

URL: http://svn.apache.org/viewvc?rev=923588&view=rev
Log:
CAMEL-1588: Let ExecutorServiceStrategy handle shutting down thread pools on shutdown, which frees the burden from the EIPs. Improved shutdown a bit to shutdown services more later.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Tue Mar 16 06:00:41 2010
@@ -1158,10 +1158,12 @@ public class DefaultCamelContext extends
         EventHelper.notifyCamelContextStarting(this);
 
         forceLazyInitialization();
-        startServices(components.values());
+        addService(executorServiceStrategy);
         addService(inflightRepository);
         addService(shutdownStrategy);
 
+        startServices(components.values());
+
         // To avoid initiating the routeDefinitions after stopping the camel context
         if (!routeDefinitionInitiated) {
             startRouteDefinitions(routeDefinitions);
@@ -1184,9 +1186,6 @@ public class DefaultCamelContext extends
 
         // the stop order is important
 
-        shutdownServices(servicesToClose);
-        servicesToClose.clear();
-
         shutdownServices(endpoints.values());
         endpoints.clear();
 
@@ -1199,7 +1198,9 @@ public class DefaultCamelContext extends
         } else {
             shutdownServices(producerServicePool);
         }
-        shutdownServices(inflightRepository);
+
+        shutdownServices(servicesToClose);
+        servicesToClose.clear();
 
         try {
             for (LifecycleStrategy strategy : lifecycleStrategies) {
@@ -1213,7 +1214,7 @@ public class DefaultCamelContext extends
         EventHelper.notifyCamelContextStopped(this);
 
         // shutdown management as the last one
-        shutdownServices(getManagementStrategy());
+        shutdownServices(managementStrategy);
 
         LOG.info("Apache Camel " + getVersion() + " (CamelContext:" + getName() + ") is shutdown");
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Tue Mar 16 06:00:41 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.impl;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -24,12 +25,16 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.CamelContext;
 import org.apache.camel.spi.ExecutorServiceStrategy;
 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision$
  */
-public class DefaultExecutorServiceStrategy implements ExecutorServiceStrategy {
+public class DefaultExecutorServiceStrategy extends ServiceSupport implements ExecutorServiceStrategy {
 
+    private static final Log LOG = LogFactory.getLog(DefaultExecutorServiceStrategy.class);
+    private final List<ExecutorService> executorServices = new ArrayList<ExecutorService>();
     private final CamelContext camelContext;
     private String threadNamePattern = "Camel Thread ${counter} - ${name}";
 
@@ -50,39 +55,103 @@ public class DefaultExecutorServiceStrat
     }
 
     public ExecutorService lookup(Object source, String executorServiceRef) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Looking up ExecutorService with ref: " + executorServiceRef);
+        }
         return camelContext.getRegistry().lookup(executorServiceRef, ExecutorService.class);
     }
 
     public ExecutorService newCachedThreadPool(Object source, String name) {
-        return ExecutorServiceHelper.newCachedThreadPool(threadNamePattern, name, true);
+        ExecutorService answer = ExecutorServiceHelper.newCachedThreadPool(threadNamePattern, name, true);
+        onNewExecutorService(answer);
+        return answer;
     }
 
     public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) {
-        return ExecutorServiceHelper.newScheduledThreadPool(poolSize, threadNamePattern, name, true);
+        ScheduledExecutorService answer = ExecutorServiceHelper.newScheduledThreadPool(poolSize, threadNamePattern, name, true);
+        onNewExecutorService(answer);
+        return answer;
     }
 
     public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) {
-        return ExecutorServiceHelper.newFixedThreadPool(poolSize, threadNamePattern, name, true);
+        ExecutorService answer = ExecutorServiceHelper.newFixedThreadPool(poolSize, threadNamePattern, name, true);
+        onNewExecutorService(answer);
+        return answer;
     }
 
     public ExecutorService newSingleThreadExecutor(Object source, String name) {
-        return ExecutorServiceHelper.newSingleThreadExecutor(threadNamePattern, name, true);
+        ExecutorService answer = ExecutorServiceHelper.newSingleThreadExecutor(threadNamePattern, name, true);
+        onNewExecutorService(answer);
+        return answer;
     }
 
     public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize) {
-        return ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize);
+        ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize);
+        onNewExecutorService(answer);
+        return answer;
     }
 
     public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, boolean daemon) {
-        return ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, daemon);
+        ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, daemon);
+        onNewExecutorService(answer);
+        return answer;
     }
 
     public void shutdown(ExecutorService executorService) {
+        if (executorService.isShutdown()) {
+            return;
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Shutting down ExecutorService: " + executorService);
+        }
         executorService.shutdown();
     }
 
     public List<Runnable> shutdownNow(ExecutorService executorService) {
+        if (executorService.isShutdown()) {
+            return null;
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Shutting down now ExecutorService: " + executorService);
+        }
         return executorService.shutdownNow();
     }
-    
+
+    /**
+     * Callback when a new {@link java.util.concurrent.ExecutorService} have been created.
+     *
+     * @param executorService the created {@link java.util.concurrent.ExecutorService} 
+     */
+    protected void onNewExecutorService(ExecutorService executorService) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Created new ExecutorService: " + executorService);
+        }
+        executorServices.add(executorService);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        // noop
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // noop
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        // shutdown all executor services
+        for (ExecutorService executorService : executorServices) {
+            // only log if something goes wrong as we want to shutdown them all
+            try {
+                shutdownNow(executorService);
+            } catch (Exception e) {
+                LOG.warn("Error occurred during shutdown of ExecutorService: "
+                        + executorService + ". This exception will be ignored.", e);
+            }
+        }
+        executorServices.clear();
+    }
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java Tue Mar 16 06:00:41 2010
@@ -263,7 +263,8 @@ public class DefaultManagementAgent exte
             try {
                 cs.stop();
             } catch (IOException e) {
-                // ignore
+                LOG.debug("Error occurred during stopping JMXConnectorService: "
+                        + cs + ". This exception will be ignored.");
             }
             cs = null;
         }
@@ -276,11 +277,11 @@ public class DefaultManagementAgent exte
         ObjectName[] mBeans = mbeansRegistered.toArray(new ObjectName[mbeansRegistered.size()]);
         int caught = 0;
         for (ObjectName name : mBeans) {
-            mbeansRegistered.remove(name);
             try {
+                mbeansRegistered.remove(name);
                 unregister(name);
-            } catch (JMException jmex) {
-                LOG.info("Exception unregistering MBean", jmex);
+            } catch (Exception e) {
+                LOG.info("Exception unregistering MBean with name " + name, e);
                 caught++;
             }
         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Tue Mar 16 06:00:41 2010
@@ -363,14 +363,6 @@ public class MulticastProcessor extends 
         ServiceHelper.stopServices(processors);
     }
 
-    @Override
-    protected void doShutdown() throws Exception {
-        // only shutdown thread pool on shutdown
-        if (executorService != null) {
-            camelContext.getExecutorServiceStrategy().shutdownNow(executorService);
-        }
-    }
-
     private static void setToEndpoint(Exchange exchange, Processor processor) {
         if (processor instanceof Producer) {
             Producer producer = (Producer) processor;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java Tue Mar 16 06:00:41 2010
@@ -69,14 +69,6 @@ public class OnCompletionProcessor exten
         ServiceHelper.stopService(processor);
     }
 
-    @Override
-    protected void doShutdown() throws Exception {
-        // only shutdown thread pool on shutdown
-        if (executorService != null) {
-            camelContext.getExecutorServiceStrategy().shutdownNow(executorService);
-        }
-    }
-
     public void process(Exchange exchange) throws Exception {
         if (processor == null) {
             return;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Tue Mar 16 06:00:41 2010
@@ -101,15 +101,6 @@ public class ThreadsProcessor extends De
         };
     }
 
-    @Override
-    protected void doShutdown() throws Exception {
-        super.doShutdown();
-        // only shutdown thread pool on shutdown
-        if (executorService != null) {
-            camelContext.getExecutorServiceStrategy().shutdownNow(executorService);
-        }
-    }
-
     public String toString() {
         return "Threads";
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Tue Mar 16 06:00:41 2010
@@ -61,15 +61,6 @@ public class WireTapProcessor extends Se
     }
 
     @Override
-    protected void doShutdown() throws Exception {
-        super.doShutdown();
-        // only shutdown thread pool on shutdown
-        if (executorService != null) {
-            camelContext.getExecutorServiceStrategy().shutdownNow(executorService);
-        }
-    }
-
-    @Override
     public String toString() {
         return "WireTap[" + destination.getEndpointUri() + "]";
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Tue Mar 16 06:00:41 2010
@@ -487,10 +487,4 @@ public class AggregateProcessor extends 
         }
     }
 
-    @Override
-    protected void doShutdown() throws Exception {
-        // only shutdown thread pool when we are shutting down
-        camelContext.getExecutorServiceStrategy().shutdownNow(executorService);
-    }
-
-}
+}
\ No newline at end of file

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Tue Mar 16 06:00:41 2010
@@ -21,6 +21,8 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.ShutdownableService;
+
 /**
  * Strategy to create thread pools.
  * <p/>
@@ -34,7 +36,7 @@ import java.util.concurrent.TimeUnit;
  *
  * @version $Revision$
  */
-public interface ExecutorServiceStrategy {
+public interface ExecutorServiceStrategy extends ShutdownableService {
 
     /**
      * Creates a full thread name

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java Tue Mar 16 06:00:41 2010
@@ -50,8 +50,8 @@ public class MultipleLifecycleStrategyTe
 
         context.stop();
 
-        assertEquals(8, dummy1.getEvents().size());
-        assertEquals(8, dummy2.getEvents().size());
+        assertEquals(9, dummy1.getEvents().size());
+        assertEquals(9, dummy2.getEvents().size());
 
         assertEquals("onContextStart", dummy1.getEvents().get(0));
         assertEquals("onContextStart", dummy2.getEvents().get(0));
@@ -61,14 +61,16 @@ public class MultipleLifecycleStrategyTe
         assertEquals("onServiceAdd", dummy2.getEvents().get(2));
         assertEquals("onServiceAdd", dummy1.getEvents().get(3));
         assertEquals("onServiceAdd", dummy2.getEvents().get(3));
-        assertEquals("onComponentAdd", dummy1.getEvents().get(4));
-        assertEquals("onComponentAdd", dummy2.getEvents().get(4));
-        assertEquals("onEndpointAdd", dummy1.getEvents().get(5));
-        assertEquals("onEndpointAdd", dummy2.getEvents().get(5));
-        assertEquals("onComponentRemove", dummy1.getEvents().get(6));
-        assertEquals("onComponentRemove", dummy2.getEvents().get(6));
-        assertEquals("onContextStop", dummy1.getEvents().get(7));
-        assertEquals("onContextStop", dummy2.getEvents().get(7));
+        assertEquals("onServiceAdd", dummy1.getEvents().get(4));
+        assertEquals("onServiceAdd", dummy2.getEvents().get(4));
+        assertEquals("onComponentAdd", dummy1.getEvents().get(5));
+        assertEquals("onComponentAdd", dummy2.getEvents().get(5));
+        assertEquals("onEndpointAdd", dummy1.getEvents().get(6));
+        assertEquals("onEndpointAdd", dummy2.getEvents().get(6));
+        assertEquals("onComponentRemove", dummy1.getEvents().get(7));
+        assertEquals("onComponentRemove", dummy2.getEvents().get(7));
+        assertEquals("onContextStop", dummy1.getEvents().get(8));
+        assertEquals("onContextStop", dummy2.getEvents().get(8));
     }
 
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java?rev=923588&r1=923587&r2=923588&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateShutdownThreadPoolTest.java Tue Mar 16 06:00:41 2010
@@ -21,14 +21,13 @@ import java.util.concurrent.ExecutorServ
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.processor.BodyInAggregatingStrategy;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * @version $Revision$
  */
 public class AggregateShutdownThreadPoolTest extends ContextTestSupport {
 
-    private ExecutorService myPool = ExecutorServiceHelper.newCachedThreadPool(null, "myPool", true);
+    private ExecutorService myPool;
 
     public void testAggregateShutdownDefaultThreadPoolTest() throws Exception {
         getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C");
@@ -95,6 +94,8 @@ public class AggregateShutdownThreadPool
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
+                myPool = context.getExecutorServiceStrategy().newCachedThreadPool(this, "myPool");
+
                 from("direct:foo").routeId("foo")
                     .aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(3)
                         .to("mock:aggregated");