You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/01/30 09:39:26 UTC

svn commit: r1237570 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/direct/ camel-core/src/main/java/org/apache/camel/component/seda/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/proc...

Author: davsclaus
Date: Mon Jan 30 08:39:25 2012
New Revision: 1237570

URL: http://svn.apache.org/viewvc?rev=1237570&view=rev
Log:
CAMEL-4953: Added ShutdownPrepare interface to allow Services in routes to do custom shutdown logic. For example the Aggregate EIP.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopParallelTest.java
      - copied, changed from r1237241, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteStartupOrder.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java
    camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java Mon Jan 30 08:39:25 2012
@@ -77,7 +77,7 @@ public class DirectConsumer extends Defa
         return 0;
     }
 
-    public void prepareShutdown() {
+    public void prepareShutdown(boolean forced) {
         // noop
     }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Mon Jan 30 08:39:25 2012
@@ -101,7 +101,8 @@ public class SedaConsumer extends Servic
         return endpoint.getQueue().size();
     }
 
-    public void prepareShutdown() {
+    @Override
+    public void prepareShutdown(boolean forced) {
         // signal we want to shutdown
         shutdownPending = true;
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Mon Jan 30 08:39:25 2012
@@ -1546,7 +1546,7 @@ public class DefaultCamelContext extends
 
         // stop route inputs in the same order as they was started so we stop the very first inputs first
         try {
-            // force shutting down routes as they mau otherwise cause shutdown to hang
+            // force shutting down routes as they may otherwise cause shutdown to hang
             shutdownStrategy.shutdownForced(this, getRouteStartupOrder());
         } catch (Throwable e) {
             log.warn("Error occurred while shutting down routes. This exception will be ignored.", e);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java Mon Jan 30 08:39:25 2012
@@ -17,11 +17,13 @@
 package org.apache.camel.impl;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.Consumer;
 import org.apache.camel.Route;
+import org.apache.camel.Service;
 import org.apache.camel.spi.RouteStartupOrder;
 
 /**
@@ -58,6 +60,15 @@ public class DefaultRouteStartupOrder im
         return answer;
     }
 
+    public List<Service> getServices() {
+        List<Service> answer = new ArrayList<Service>();
+        Collection<Route> routes = routeService.getRoutes();
+        for (Route route : routes) {
+            answer.addAll(route.getServices());
+        }
+        return answer;
+    }
+
     public RouteService getRouteService() {
         return routeService;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Mon Jan 30 08:39:25 2012
@@ -19,7 +19,9 @@ package org.apache.camel.impl;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -36,6 +38,7 @@ import org.apache.camel.ShutdownRunningT
 import org.apache.camel.SuspendableService;
 import org.apache.camel.spi.RouteStartupOrder;
 import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.spi.ShutdownPrepared;
 import org.apache.camel.spi.ShutdownStrategy;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.EventHelper;
@@ -51,6 +54,23 @@ import org.slf4j.LoggerFactory;
  * Graceful shutdown ensures that any inflight and pending messages will be taken into account
  * and it will wait until these exchanges has been completed.
  * <p/>
+ * This strategy will perform graceful shutdown in two steps:
+ * <ul>
+ *     <li>Graceful - By suspending/stopping consumers, and let any in-flight exchanges complete</li>
+ *     <li>Forced - After a given period of time, a timeout occurred and if there are still pending
+ *     exchanges to complete, then a more aggressive forced strategy is performed.</li>
+ * </ul>
+ * The idea by the <tt>graceful</tt> shutdown strategy, is to stop taking in more new messages,
+ * and allow any existing inflight messages to complete. Then when there is no more inflight messages
+ * then the routes can be fully shutdown. This mean that if there is inflight messages then we will have
+ * to wait for these messages to complete. If they do not complete after a period of time, then a
+ * timeout triggers. And then a more aggressive strategy takes over.
+ * <p/>
+ * The idea by the <tt>forced</tt> shutdown strategy, is to stop continue processing messages.
+ * And force routes and its services to shutdown now. There is a risk when shutting down now,
+ * that some resources is not properly shutdown, which can cause side effects. The timeout value
+ * is by default 300 seconds, but can be customized. 
+ * <p/>
  * As this strategy will politely wait until all exchanges has been completed it can potential wait
  * for a long time, and hence why a timeout value can be set. When the timeout triggers you can also
  * specify whether the remainder consumers should be shutdown now or ignore.
@@ -62,6 +82,15 @@ import org.slf4j.LoggerFactory;
  * <p/>
  * Routes will by default be shutdown in the reverse order of which they where started.
  * You can customize this using the {@link #setShutdownRoutesInReverseOrder(boolean)} method.
+ * <p/>
+ * After route consumers have been shutdown, then any {@link ShutdownPrepared} services on the routes
+ * is being prepared for shutdown, by invoking {@link ShutdownPrepared#prepareShutdown(boolean)} which
+ * <tt>force=false</tt>.
+ * <p/>
+ * Then if a timeout occurred and the strategy has been configured with shutdown-now on timeout, then
+ * the strategy performs a more aggressive forced shutdown, by forcing all consumers to shutdown
+ * and then invokes {@link ShutdownPrepared#prepareShutdown(boolean)} with <tt>force=true</tt>
+ * on the services. This allows the services to know they should force shutdown now.
  *
  * @version 
  */
@@ -156,6 +185,13 @@ public class DefaultShutdownStrategy ext
                     LOG.warn("Timeout occurred. Now forcing the routes to be shutdown now.");
                     // force the routes to shutdown now
                     shutdownRoutesNow(routesOrdered);
+
+                    // now the route consumers has been shutdown, then prepare route services for shutdown now (forced)
+                    for (RouteStartupOrder order : routes) {
+                        for (Service service : order.getServices()) {
+                            prepareShutdown(service, true, true);
+                        }
+                    }
                 } else {
                     LOG.warn("Timeout occurred. Will ignore shutting down the remainder routes.");
                 }
@@ -319,6 +355,38 @@ public class DefaultShutdownStrategy ext
         }
     }
 
+    /**
+     * Prepares the services for shutdown, by invoking the {@link ShutdownPrepared#prepareShutdown(boolean)} method
+     * on the service if it implement this interface.
+     * 
+     * @param service the service
+     * @param forced  whether to force shutdown
+     * @param includeChildren whether to prepare the child of the service as well
+     */
+    private static void prepareShutdown(Service service, boolean forced, boolean includeChildren) {
+        Set<Service> list;
+        if (includeChildren) {
+            list = ServiceHelper.getChildServices(service);
+        } else {
+            list = new LinkedHashSet<Service>(1);
+            list.add(service);
+        }
+
+        for (Service child : list) {
+            if (child instanceof ShutdownPrepared) {
+                try {
+                    LOG.trace("Preparing {} shutdown on {}", forced ? "forced" : "", child);
+                    ((ShutdownPrepared) child).prepareShutdown(forced);
+                } catch (Exception e) {
+                    LOG.warn("Error during prepare shutdown on " + child + ". This exception will be ignored.", e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Holder for deferred consumers
+     */
     static class ShutdownDeferredConsumer {
         private final Route route;
         private final Consumer consumer;
@@ -372,7 +440,6 @@ public class DefaultShutdownStrategy ext
             // list of deferred consumers to shutdown when all exchanges has been completed routed
             // and thus there are no more inflight exchanges so they can be safely shutdown at that time
             List<ShutdownDeferredConsumer> deferredConsumers = new ArrayList<ShutdownDeferredConsumer>();
-
             for (RouteStartupOrder order : routes) {
 
                 ShutdownRoute shutdownRoute = order.getRoute().getRouteContext().getShutdownRoute();
@@ -469,7 +536,8 @@ public class DefaultShutdownStrategy ext
                 Consumer consumer = deferred.getConsumer();
                 if (consumer instanceof ShutdownAware) {
                     LOG.trace("Route: {} preparing to shutdown.", deferred.getRoute().getId());
-                    ((ShutdownAware) consumer).prepareShutdown();
+                    boolean forced = context.getShutdownStrategy().forceShutdown(consumer);
+                    prepareShutdown(consumer, forced, false);
                     LOG.debug("Route: {} preparing to shutdown complete.", deferred.getRoute().getId());
                 }
             }
@@ -485,6 +553,14 @@ public class DefaultShutdownStrategy ext
                     LOG.info("Route: {} shutdown complete, was consuming from: {}", deferred.getRoute().getId(), deferred.getConsumer().getEndpoint());
                 }
             }
+
+            // now the route consumers has been shutdown, then prepare route services for shutdown
+            for (RouteStartupOrder order : routes) {
+                for (Service service : order.getServices()) {
+                    boolean forced = context.getShutdownStrategy().forceShutdown(service);
+                    prepareShutdown(service, forced, true);
+                }
+            }
         }
 
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java Mon Jan 30 08:39:25 2012
@@ -134,7 +134,7 @@ public class RouteService extends ChildS
                 // gather list of services to start as we need to start child services as well
                 Set<Service> list = new LinkedHashSet<Service>();
                 for (Service service : services) {
-                    doGetChildServices(list, service);
+                    list.addAll(ServiceHelper.getChildServices(service));
                 }
 
                 // split into consumers and child services as we need to start the consumers
@@ -202,7 +202,7 @@ public class RouteService extends ChildS
             // gather list of services to stop as we need to start child services as well
             Set<Service> list = new LinkedHashSet<Service>();
             for (Service service : services) {
-                doGetChildServices(list, service);
+                list.addAll(ServiceHelper.getChildServices(service));
             }
             stopChildService(route, list, isShutdownCamelContext);
 
@@ -240,7 +240,7 @@ public class RouteService extends ChildS
             // gather list of services to stop as we need to start child services as well
             Set<Service> list = new LinkedHashSet<Service>();
             for (Service service : services) {
-                doGetChildServices(list, service);
+                list.addAll(ServiceHelper.getChildServices(service));
             }
 
             // shutdown services
@@ -351,23 +351,4 @@ public class RouteService extends ChildS
         }
     }
 
-    /**
-     * Gather all child services by navigating the service to recursively gather all child services.
-     */
-    private static void doGetChildServices(Set<Service> services, Service service) throws Exception {
-        services.add(service);
-
-        if (service instanceof Navigate) {
-            Navigate<?> nav = (Navigate<?>) service;
-            if (nav.hasNext()) {
-                List<?> children = nav.next();
-                for (Object child : children) {
-                    if (child instanceof Service) {
-                        doGetChildServices(services, (Service) child);
-                    }
-                }
-            }
-        }
-    }
-
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java Mon Jan 30 08:39:25 2012
@@ -74,7 +74,7 @@ public abstract class ScheduledBatchPoll
     }
 
     @Override
-    public void prepareShutdown() {
+    public void prepareShutdown(boolean forced) {
         // reset task as the state of the task is not to be preserved
         // which otherwise may cause isBatchAllowed() to return a wrong answer
         this.shutdownRunningTask = null;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Mon Jan 30 08:39:25 2012
@@ -47,6 +47,7 @@ import org.apache.camel.impl.LoggingExce
 import org.apache.camel.spi.AggregationRepository;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.RecoverableAggregationRepository;
+import org.apache.camel.spi.ShutdownPrepared;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.DefaultTimeoutMap;
 import org.apache.camel.support.ServiceSupport;
@@ -74,7 +75,7 @@ import org.slf4j.LoggerFactory;
  * and older prices are discarded). Another idea is to combine line item messages
  * together into a single invoice message.
  */
-public class AggregateProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable {
+public class AggregateProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable, ShutdownPrepared {
 
     public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker";
 
@@ -879,15 +880,9 @@ public class AggregateProcessor extends 
 
     @Override
     protected void doStop() throws Exception {
-
-        if (forceCompletionOnStop) {
-            forceCompletionOfAllGroups();
-
-            while (inProgressCompleteExchanges.size() > 0) {
-                LOG.trace("waiting for {} in progress exchanges to complete", inProgressCompleteExchanges.size());
-                Thread.sleep(100);
-            }
-        }
+        // note: we cannot do doForceCompletionOnStop from this doStop method
+        // as this is handled in the prepareShutdown method which is also invoked when stopping a route
+        // and is better suited for preparing to shutdown than this doStop method is
 
         if (recoverService != null) {
             camelContext.getExecutorServiceManager().shutdownNow(recoverService);
@@ -904,6 +899,37 @@ public class AggregateProcessor extends 
     }
 
     @Override
+    public void prepareShutdown(boolean forced) {
+        // we are shutting down, so force completion if this option was enabled
+        // but only do this when forced=false, as that is when we have chance to
+        // send out new messages to be routed by Camel. When forced=true, then
+        // we have to shutdown in a hurry
+        if (!forced && forceCompletionOnStop) {
+            doForceCompletionOnStop();
+        }
+    }
+
+    private void doForceCompletionOnStop() {
+        int expected = forceCompletionOfAllGroups();
+
+        StopWatch watch = new StopWatch();
+        while (inProgressCompleteExchanges.size() > 0) {
+            LOG.trace("Waiting for {} inflight exchanges to complete", inProgressCompleteExchanges.size());
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                // break out as we got interrupted such as the JVM terminating
+                LOG.warn("Interrupted while waiting for {} inflight exchanges to complete.", inProgressCompleteExchanges.size());
+                break;
+            }
+        }
+
+        if (expected > 0) {
+            LOG.info("Forcing completion of all groups with {} exchanges completed in {}", expected, TimeUtils.printDuration(watch.stop()));
+        }
+    }
+
+    @Override
     protected void doShutdown() throws Exception {
         // shutdown aggregation repository
         ServiceHelper.stopService(aggregationRepository);
@@ -914,12 +940,13 @@ public class AggregateProcessor extends 
         super.doShutdown();
     }
 
-    public void forceCompletionOfAllGroups() {
+    public int forceCompletionOfAllGroups() {
 
-        // only run if CamelContext has been fully started
-        if (!camelContext.getStatus().isStarted()) {
+        // only run if CamelContext has been fully started or is stopping
+        boolean allow = camelContext.getStatus().isStarted() || camelContext.getStatus().isStopping();
+        if (!allow) {
             LOG.warn("cannot start force completion because CamelContext({}) has not been started yet", camelContext.getName());
-            return;
+            return 0;
         }
 
         LOG.trace("Starting force completion of all groups task");
@@ -927,9 +954,11 @@ public class AggregateProcessor extends 
         // trigger completion for all in the repository
         Set<String> keys = aggregationRepository.getKeys();
 
+        int total = 0;
         if (keys != null && !keys.isEmpty()) {
             // must acquire the shared aggregation lock to be able to trigger force completion
             lock.lock();
+            total = keys.size();
             try {
                 for (String key : keys) {
                     Exchange exchange = aggregationRepository.get(camelContext, key);
@@ -944,7 +973,12 @@ public class AggregateProcessor extends 
                 lock.unlock();
             }
         }
-
         LOG.trace("Completed force completion of all groups task");
+
+        if (total > 0) {
+            LOG.debug("Forcing completion of all groups with {} exchanges", total);
+        }
+        return total;
     }
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java Mon Jan 30 08:39:25 2012
@@ -319,7 +319,7 @@ public class DefaultChannel extends Serv
         // determine if we can still run, or the camel context is forcing a shutdown
         boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this);
         if (forceShutdown) {
-            LOG.trace("Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: {}", exchange);
+            LOG.debug("Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: {}", exchange);
             if (exchange.getException() == null) {
                 exchange.setException(new RejectedExecutionException());
             }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteStartupOrder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteStartupOrder.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteStartupOrder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteStartupOrder.java Mon Jan 30 08:39:25 2012
@@ -20,6 +20,7 @@ import java.util.List;
 
 import org.apache.camel.Consumer;
 import org.apache.camel.Route;
+import org.apache.camel.Service;
 
 /**
  * Information about a route to be started where we want to control the order
@@ -53,4 +54,11 @@ public interface RouteStartupOrder {
      */
     List<Consumer> getInputs();
 
+    /**
+     * Gets the services to this route.
+     *
+     * @return the services.
+     */
+    List<Service> getServices();
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java Mon Jan 30 08:39:25 2012
@@ -27,7 +27,7 @@ import org.apache.camel.ShutdownRunningT
  * @version 
  * @see org.apache.camel.spi.ShutdownStrategy
  */
-public interface ShutdownAware {
+public interface ShutdownAware extends ShutdownPrepared {
 
     /**
      * To defer shutdown during first phase of shutdown. This allows any pending exchanges to be completed
@@ -53,10 +53,4 @@ public interface ShutdownAware {
      */
     int getPendingExchangesSize();
 
-    /**
-     * Prepares the consumer for shutdown.
-     * <p/>
-     * For example by graceful stopping any threads or the likes.
-     */
-    void prepareShutdown();
 }

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java?rev=1237570&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java Mon Jan 30 08:39:25 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+/**
+ * Allows a {@link org.apache.camel.Service} to prepare for shutdown.
+ * <p/>
+ * <b>Important: </b> Implementators of this interface must be a {@link org.apache.camel.Service} as well.
+ * <p/>
+ * This allows {@link org.apache.camel.Processor}s to prepare for shutdown, such as when
+ * {@link org.apache.camel.CamelContext} or a {@link org.apache.camel.Route} is shutting down.
+ * The {@link org.apache.camel.Processor} could be a stateful EIP such as the
+ * {@link org.apache.camel.processor.aggregate.AggregateProcessor}, allowing it to do custom work
+ * to prepare for shutdown.
+ */
+public interface ShutdownPrepared {
+
+    /**
+     * Prepares for shutdown.
+     * <p/>
+     * The {@link ShutdownStrategy} supports preparing for shutdown using two steps.
+     * First a regular preparation, where the given forced parameter will be <tt>false</tt>.
+     * And if the shutdown times out, then the {@link ShutdownStrategy} performs a more aggressive
+     * shutdown, calling this method a second time with <tt>true</tt> for the given forced parameter.
+     * <p/>
+     * For example by graceful stopping any threads or the likes.
+     * <p/>
+     * For forced shutdown, then the service is expected to aggressively shutdown any child services, such
+     * as thread pools etc. This is the last chance it has to perform such duties.
+     * 
+     * @param forced <tt>true</tt> is forcing a more aggressive shutdown, <tt>false</tt> is for preparing to shutdown. 
+     */
+    void prepareShutdown(boolean forced);
+
+}

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java Mon Jan 30 08:39:25 2012
@@ -18,8 +18,11 @@ package org.apache.camel.util;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Set;
 
+import org.apache.camel.Navigate;
 import org.apache.camel.Service;
 import org.apache.camel.ShutdownableService;
 import org.apache.camel.StatefulService;
@@ -341,4 +344,31 @@ public final class ServiceHelper {
         return false;
     }
 
+    /**
+     * Gather all child services by navigating the service to recursively gather all child services.
+     *
+     * @param service the service
+     * @return the services, including the parent service, and all its children
+     */
+    public static Set<Service> getChildServices(Service service) {
+        Set<Service> answer = new LinkedHashSet<Service>();
+        doGetChildServices(answer, service);
+        return answer;
+    }
+
+    private static void doGetChildServices(Set<Service> services, Service service) {
+        services.add(service);
+        if (service instanceof Navigate) {
+            Navigate<?> nav = (Navigate<?>) service;
+            if (nav.hasNext()) {
+                List<?> children = nav.next();
+                for (Object child : children) {
+                    if (child instanceof Service) {
+                        doGetChildServices(services, (Service) child);
+                    }
+                }
+            }
+        }
+    }
+    
 }

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopParallelTest.java (from r1237241, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopParallelTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopParallelTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java&r1=1237241&r2=1237570&rev=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopParallelTest.java Mon Jan 30 08:39:25 2012
@@ -16,67 +16,28 @@
  */
 package org.apache.camel.processor.aggregator;
 
-import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.processor.BodyInAggregatingStrategy;
 
 /**
  * @version 
  */
-public class AggregateForceCompletionOnStopTest extends ContextTestSupport {
-
-    // TODO: Need CAMEL-4953 to fix me
-
-    MyCompletionProcessor myCompletionProcessor = new MyCompletionProcessor();
-
-    public void testFixMe() throws Exception {
-        // TODO: remove me
-    }
-
-    public void xxxTestForceCompletionTrue() throws Exception {
-        myCompletionProcessor.reset();
-        context.getShutdownStrategy().setShutdownNowOnTimeout(true);
-        context.getShutdownStrategy().setTimeout(5);
-
-        template.sendBodyAndHeader("direct:forceCompletionTrue", "test1", "id", "1");
-        template.sendBodyAndHeader("direct:forceCompletionTrue", "test2", "id", "2");
-        template.sendBodyAndHeader("direct:forceCompletionTrue", "test3", "id", "1");
-        template.sendBodyAndHeader("direct:forceCompletionTrue", "test4", "id", "2");
-        assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
-        context.stop();
-        assertEquals("aggregation should have completed", 2, myCompletionProcessor.getAggregationCount());
-    }
-
-    public void xxxTestForceCompletionFalse() throws Exception {
-        myCompletionProcessor.reset();
-        context.getShutdownStrategy().setShutdownNowOnTimeout(true);
-        context.getShutdownStrategy().setTimeout(5);
-
-        template.sendBodyAndHeader("direct:forceCompletionFalse", "test1", "id", "1");
-        template.sendBodyAndHeader("direct:forceCompletionFalse", "test2", "id", "2");
-        template.sendBodyAndHeader("direct:forceCompletionFalse", "test3", "id", "1");
-        template.sendBodyAndHeader("direct:forceCompletionFalse", "test4", "id", "2");
-        assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
-        context.stop();
-        assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
-    }
+public class AggregateForceCompletionOnStopParallelTest extends AggregateForceCompletionOnStopTest {
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
-
             @Override
             public void configure() throws Exception {
-
-                from("direct:forceCompletionTrue")
-                    .aggregate(header("id"), new BodyInAggregatingStrategy()).forceCompletionOnStop().completionSize(10)
+                from("direct:forceCompletionTrue").routeId("foo")
+                    .aggregate(header("id"), new BodyInAggregatingStrategy()).forceCompletionOnStop().completionSize(10).parallelProcessing()
                     .delay(100)
-                    .process(myCompletionProcessor);
+                    .processRef("myCompletionProcessor");
 
-                from("direct:forceCompletionFalse")
-                    .aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(10)
+                from("direct:forceCompletionFalse").routeId("bar")
+                    .aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(10).parallelProcessing()
                     .delay(100)
-                    .process(myCompletionProcessor);
+                    .processRef("myCompletionProcessor");
             }
         };
     }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java Mon Jan 30 08:39:25 2012
@@ -18,6 +18,7 @@ package org.apache.camel.processor.aggre
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
 import org.apache.camel.processor.BodyInAggregatingStrategy;
 
 /**
@@ -25,16 +26,44 @@ import org.apache.camel.processor.BodyIn
  */
 public class AggregateForceCompletionOnStopTest extends ContextTestSupport {
 
-    // TODO: Need CAMEL-4953 to fix me
+    public void testForceCompletionTrue() throws Exception {
+        MyCompletionProcessor myCompletionProcessor = context.getRegistry().lookup("myCompletionProcessor", MyCompletionProcessor.class);
+        myCompletionProcessor.reset();
 
-    MyCompletionProcessor myCompletionProcessor = new MyCompletionProcessor();
+        context.getShutdownStrategy().setShutdownNowOnTimeout(true);
+        context.getShutdownStrategy().setTimeout(5);
+
+        template.sendBodyAndHeader("direct:forceCompletionTrue", "test1", "id", "1");
+        template.sendBodyAndHeader("direct:forceCompletionTrue", "test2", "id", "2");
+        template.sendBodyAndHeader("direct:forceCompletionTrue", "test3", "id", "1");
+        template.sendBodyAndHeader("direct:forceCompletionTrue", "test4", "id", "2");
 
-    public void testFixMe() throws Exception {
-        // TODO: remove me
+        assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
+        context.stop();
+        assertEquals("aggregation should have completed", 2, myCompletionProcessor.getAggregationCount());
     }
 
-    public void xxxTestForceCompletionTrue() throws Exception {
+    public void testForceCompletionFalse() throws Exception {
+        MyCompletionProcessor myCompletionProcessor = context.getRegistry().lookup("myCompletionProcessor", MyCompletionProcessor.class);
         myCompletionProcessor.reset();
+
+        context.getShutdownStrategy().setShutdownNowOnTimeout(true);
+        context.getShutdownStrategy().setTimeout(5);
+
+        template.sendBodyAndHeader("direct:forceCompletionFalse", "test1", "id", "1");
+        template.sendBodyAndHeader("direct:forceCompletionFalse", "test2", "id", "2");
+        template.sendBodyAndHeader("direct:forceCompletionFalse", "test3", "id", "1");
+        template.sendBodyAndHeader("direct:forceCompletionFalse", "test4", "id", "2");
+
+        assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
+        context.stop();
+        assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
+    }
+
+    public void testStopRouteForceCompletionTrue() throws Exception {
+        MyCompletionProcessor myCompletionProcessor = context.getRegistry().lookup("myCompletionProcessor", MyCompletionProcessor.class);
+        myCompletionProcessor.reset();
+
         context.getShutdownStrategy().setShutdownNowOnTimeout(true);
         context.getShutdownStrategy().setTimeout(5);
 
@@ -42,13 +71,17 @@ public class AggregateForceCompletionOnS
         template.sendBodyAndHeader("direct:forceCompletionTrue", "test2", "id", "2");
         template.sendBodyAndHeader("direct:forceCompletionTrue", "test3", "id", "1");
         template.sendBodyAndHeader("direct:forceCompletionTrue", "test4", "id", "2");
+
         assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
-        context.stop();
+        // stopping a route should also force the completion
+        context.stopRoute("foo");
         assertEquals("aggregation should have completed", 2, myCompletionProcessor.getAggregationCount());
     }
 
-    public void xxxTestForceCompletionFalse() throws Exception {
+    public void testStopRouteForceCompletionFalse() throws Exception {
+        MyCompletionProcessor myCompletionProcessor = context.getRegistry().lookup("myCompletionProcessor", MyCompletionProcessor.class);
         myCompletionProcessor.reset();
+
         context.getShutdownStrategy().setShutdownNowOnTimeout(true);
         context.getShutdownStrategy().setTimeout(5);
 
@@ -56,27 +89,33 @@ public class AggregateForceCompletionOnS
         template.sendBodyAndHeader("direct:forceCompletionFalse", "test2", "id", "2");
         template.sendBodyAndHeader("direct:forceCompletionFalse", "test3", "id", "1");
         template.sendBodyAndHeader("direct:forceCompletionFalse", "test4", "id", "2");
+
         assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
-        context.stop();
+        context.stopRoute("bar");
         assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
     }
 
     @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myCompletionProcessor", new MyCompletionProcessor());
+        return jndi;
+    }
+
+    @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
-
             @Override
             public void configure() throws Exception {
-
-                from("direct:forceCompletionTrue")
+                from("direct:forceCompletionTrue").routeId("foo")
                     .aggregate(header("id"), new BodyInAggregatingStrategy()).forceCompletionOnStop().completionSize(10)
                     .delay(100)
-                    .process(myCompletionProcessor);
+                    .processRef("myCompletionProcessor");
 
-                from("direct:forceCompletionFalse")
+                from("direct:forceCompletionFalse").routeId("bar")
                     .aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(10)
                     .delay(100)
-                    .process(myCompletionProcessor);
+                    .processRef("myCompletionProcessor");
             }
         };
     }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java Mon Jan 30 08:39:25 2012
@@ -16,22 +16,24 @@
  */
 package org.apache.camel.processor.aggregator;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 
 public class MyCompletionProcessor implements Processor {
-    private static int aggregationCount;
+    private final AtomicInteger aggregationCount = new AtomicInteger();
 
     public int getAggregationCount() {
-        return aggregationCount;
+        return aggregationCount.get();
     }
 
     @Override
     public void process(Exchange exchange) throws Exception {
-        aggregationCount++;
+        aggregationCount.incrementAndGet();
     }
 
     public void reset() {
-        aggregationCount = 0;
+        aggregationCount.set(0);
     }
 }

Modified: camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java (original)
+++ camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java Mon Jan 30 08:39:25 2012
@@ -103,7 +103,8 @@ public class KestrelConsumer extends Def
         return pendingExchangeCount.get();
     }
 
-    public void prepareShutdown() {
+    @Override
+    public void prepareShutdown(boolean forced) {
         // Signal to our threads that shutdown is happening
         shutdownPending = true;
 

Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java Mon Jan 30 08:39:25 2012
@@ -95,7 +95,8 @@ public class RouteboxDirectConsumer exte
         return 0;
     }
 
-    public void prepareShutdown() {
+    @Override
+    public void prepareShutdown(boolean forced) {
         // noop
     }
     

Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java Mon Jan 30 08:39:25 2012
@@ -122,8 +122,9 @@ public class RouteboxSedaConsumer extend
         // TODO: Get size of queue
         return 0;
     }
-    
-    public void prepareShutdown() {
+
+    @Override
+    public void prepareShutdown(boolean forced) {
     }
     
     public void setProcessor(AsyncProcessor processor) {

Modified: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml Mon Jan 30 08:39:25 2012
@@ -23,18 +23,20 @@
     ">
 
     <camelContext xmlns="http://camel.apache.org/schema/spring">
-        <route>
+        <route id="foo">
             <from uri="direct:forceCompletionTrue"/>
             <aggregate strategyRef="aggregatorStrategy" forceCompletionOnStop="true" completionSize="10">
                 <correlationExpression><header>id</header></correlationExpression>
+                <delay><constant>100</constant></delay>
                 <process ref="myCompletionProcessor"/>
             </aggregate>
         </route>
 
-        <route>
+        <route id="bar">
             <from uri="direct:forceCompletionFalse"/>
             <aggregate strategyRef="aggregatorStrategy" completionSize="10">
                 <correlationExpression><header>id</header></correlationExpression>
+                <delay><constant>100</constant></delay>
                 <process ref="myCompletionProcessor"/>
             </aggregate>
         </route>