You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/01/30 09:39:26 UTC
svn commit: r1237570 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/component/direct/
camel-core/src/main/java/org/apache/camel/component/seda/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/proc...
Author: davsclaus
Date: Mon Jan 30 08:39:25 2012
New Revision: 1237570
URL: http://svn.apache.org/viewvc?rev=1237570&view=rev
Log:
CAMEL-4953: Added ShutdownPrepare interface to allow Services in routes to do custom shutdown logic. For example the Aggregate EIP.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopParallelTest.java
- copied, changed from r1237241, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteStartupOrder.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java
camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java Mon Jan 30 08:39:25 2012
@@ -77,7 +77,7 @@ public class DirectConsumer extends Defa
return 0;
}
- public void prepareShutdown() {
+ public void prepareShutdown(boolean forced) {
// noop
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Mon Jan 30 08:39:25 2012
@@ -101,7 +101,8 @@ public class SedaConsumer extends Servic
return endpoint.getQueue().size();
}
- public void prepareShutdown() {
+ @Override
+ public void prepareShutdown(boolean forced) {
// signal we want to shutdown
shutdownPending = true;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Mon Jan 30 08:39:25 2012
@@ -1546,7 +1546,7 @@ public class DefaultCamelContext extends
// stop route inputs in the same order as they was started so we stop the very first inputs first
try {
- // force shutting down routes as they mau otherwise cause shutdown to hang
+ // force shutting down routes as they may otherwise cause shutdown to hang
shutdownStrategy.shutdownForced(this, getRouteStartupOrder());
} catch (Throwable e) {
log.warn("Error occurred while shutting down routes. This exception will be ignored.", e);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java Mon Jan 30 08:39:25 2012
@@ -17,11 +17,13 @@
package org.apache.camel.impl;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.camel.Consumer;
import org.apache.camel.Route;
+import org.apache.camel.Service;
import org.apache.camel.spi.RouteStartupOrder;
/**
@@ -58,6 +60,15 @@ public class DefaultRouteStartupOrder im
return answer;
}
+ public List<Service> getServices() {
+ List<Service> answer = new ArrayList<Service>();
+ Collection<Route> routes = routeService.getRoutes();
+ for (Route route : routes) {
+ answer.addAll(route.getServices());
+ }
+ return answer;
+ }
+
public RouteService getRouteService() {
return routeService;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Mon Jan 30 08:39:25 2012
@@ -19,7 +19,9 @@ package org.apache.camel.impl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -36,6 +38,7 @@ import org.apache.camel.ShutdownRunningT
import org.apache.camel.SuspendableService;
import org.apache.camel.spi.RouteStartupOrder;
import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.spi.ShutdownPrepared;
import org.apache.camel.spi.ShutdownStrategy;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.EventHelper;
@@ -51,6 +54,23 @@ import org.slf4j.LoggerFactory;
* Graceful shutdown ensures that any inflight and pending messages will be taken into account
* and it will wait until these exchanges has been completed.
* <p/>
+ * This strategy will perform graceful shutdown in two steps:
+ * <ul>
+ * <li>Graceful - By suspending/stopping consumers, and let any in-flight exchanges complete</li>
+ * <li>Forced - After a given period of time, a timeout occurred and if there are still pending
+ * exchanges to complete, then a more aggressive forced strategy is performed.</li>
+ * </ul>
+ * The idea by the <tt>graceful</tt> shutdown strategy, is to stop taking in more new messages,
+ * and allow any existing inflight messages to complete. Then when there is no more inflight messages
+ * then the routes can be fully shutdown. This mean that if there is inflight messages then we will have
+ * to wait for these messages to complete. If they do not complete after a period of time, then a
+ * timeout triggers. And then a more aggressive strategy takes over.
+ * <p/>
+ * The idea by the <tt>forced</tt> shutdown strategy, is to stop continue processing messages.
+ * And force routes and its services to shutdown now. There is a risk when shutting down now,
+ * that some resources is not properly shutdown, which can cause side effects. The timeout value
+ * is by default 300 seconds, but can be customized.
+ * <p/>
* As this strategy will politely wait until all exchanges has been completed it can potential wait
* for a long time, and hence why a timeout value can be set. When the timeout triggers you can also
* specify whether the remainder consumers should be shutdown now or ignore.
@@ -62,6 +82,15 @@ import org.slf4j.LoggerFactory;
* <p/>
* Routes will by default be shutdown in the reverse order of which they where started.
* You can customize this using the {@link #setShutdownRoutesInReverseOrder(boolean)} method.
+ * <p/>
+ * After route consumers have been shutdown, then any {@link ShutdownPrepared} services on the routes
+ * is being prepared for shutdown, by invoking {@link ShutdownPrepared#prepareShutdown(boolean)} which
+ * <tt>force=false</tt>.
+ * <p/>
+ * Then if a timeout occurred and the strategy has been configured with shutdown-now on timeout, then
+ * the strategy performs a more aggressive forced shutdown, by forcing all consumers to shutdown
+ * and then invokes {@link ShutdownPrepared#prepareShutdown(boolean)} with <tt>force=true</tt>
+ * on the services. This allows the services to know they should force shutdown now.
*
* @version
*/
@@ -156,6 +185,13 @@ public class DefaultShutdownStrategy ext
LOG.warn("Timeout occurred. Now forcing the routes to be shutdown now.");
// force the routes to shutdown now
shutdownRoutesNow(routesOrdered);
+
+ // now the route consumers has been shutdown, then prepare route services for shutdown now (forced)
+ for (RouteStartupOrder order : routes) {
+ for (Service service : order.getServices()) {
+ prepareShutdown(service, true, true);
+ }
+ }
} else {
LOG.warn("Timeout occurred. Will ignore shutting down the remainder routes.");
}
@@ -319,6 +355,38 @@ public class DefaultShutdownStrategy ext
}
}
+ /**
+ * Prepares the services for shutdown, by invoking the {@link ShutdownPrepared#prepareShutdown(boolean)} method
+ * on the service if it implement this interface.
+ *
+ * @param service the service
+ * @param forced whether to force shutdown
+ * @param includeChildren whether to prepare the child of the service as well
+ */
+ private static void prepareShutdown(Service service, boolean forced, boolean includeChildren) {
+ Set<Service> list;
+ if (includeChildren) {
+ list = ServiceHelper.getChildServices(service);
+ } else {
+ list = new LinkedHashSet<Service>(1);
+ list.add(service);
+ }
+
+ for (Service child : list) {
+ if (child instanceof ShutdownPrepared) {
+ try {
+ LOG.trace("Preparing {} shutdown on {}", forced ? "forced" : "", child);
+ ((ShutdownPrepared) child).prepareShutdown(forced);
+ } catch (Exception e) {
+ LOG.warn("Error during prepare shutdown on " + child + ". This exception will be ignored.", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Holder for deferred consumers
+ */
static class ShutdownDeferredConsumer {
private final Route route;
private final Consumer consumer;
@@ -372,7 +440,6 @@ public class DefaultShutdownStrategy ext
// list of deferred consumers to shutdown when all exchanges has been completed routed
// and thus there are no more inflight exchanges so they can be safely shutdown at that time
List<ShutdownDeferredConsumer> deferredConsumers = new ArrayList<ShutdownDeferredConsumer>();
-
for (RouteStartupOrder order : routes) {
ShutdownRoute shutdownRoute = order.getRoute().getRouteContext().getShutdownRoute();
@@ -469,7 +536,8 @@ public class DefaultShutdownStrategy ext
Consumer consumer = deferred.getConsumer();
if (consumer instanceof ShutdownAware) {
LOG.trace("Route: {} preparing to shutdown.", deferred.getRoute().getId());
- ((ShutdownAware) consumer).prepareShutdown();
+ boolean forced = context.getShutdownStrategy().forceShutdown(consumer);
+ prepareShutdown(consumer, forced, false);
LOG.debug("Route: {} preparing to shutdown complete.", deferred.getRoute().getId());
}
}
@@ -485,6 +553,14 @@ public class DefaultShutdownStrategy ext
LOG.info("Route: {} shutdown complete, was consuming from: {}", deferred.getRoute().getId(), deferred.getConsumer().getEndpoint());
}
}
+
+ // now the route consumers has been shutdown, then prepare route services for shutdown
+ for (RouteStartupOrder order : routes) {
+ for (Service service : order.getServices()) {
+ boolean forced = context.getShutdownStrategy().forceShutdown(service);
+ prepareShutdown(service, forced, true);
+ }
+ }
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java Mon Jan 30 08:39:25 2012
@@ -134,7 +134,7 @@ public class RouteService extends ChildS
// gather list of services to start as we need to start child services as well
Set<Service> list = new LinkedHashSet<Service>();
for (Service service : services) {
- doGetChildServices(list, service);
+ list.addAll(ServiceHelper.getChildServices(service));
}
// split into consumers and child services as we need to start the consumers
@@ -202,7 +202,7 @@ public class RouteService extends ChildS
// gather list of services to stop as we need to start child services as well
Set<Service> list = new LinkedHashSet<Service>();
for (Service service : services) {
- doGetChildServices(list, service);
+ list.addAll(ServiceHelper.getChildServices(service));
}
stopChildService(route, list, isShutdownCamelContext);
@@ -240,7 +240,7 @@ public class RouteService extends ChildS
// gather list of services to stop as we need to start child services as well
Set<Service> list = new LinkedHashSet<Service>();
for (Service service : services) {
- doGetChildServices(list, service);
+ list.addAll(ServiceHelper.getChildServices(service));
}
// shutdown services
@@ -351,23 +351,4 @@ public class RouteService extends ChildS
}
}
- /**
- * Gather all child services by navigating the service to recursively gather all child services.
- */
- private static void doGetChildServices(Set<Service> services, Service service) throws Exception {
- services.add(service);
-
- if (service instanceof Navigate) {
- Navigate<?> nav = (Navigate<?>) service;
- if (nav.hasNext()) {
- List<?> children = nav.next();
- for (Object child : children) {
- if (child instanceof Service) {
- doGetChildServices(services, (Service) child);
- }
- }
- }
- }
- }
-
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java Mon Jan 30 08:39:25 2012
@@ -74,7 +74,7 @@ public abstract class ScheduledBatchPoll
}
@Override
- public void prepareShutdown() {
+ public void prepareShutdown(boolean forced) {
// reset task as the state of the task is not to be preserved
// which otherwise may cause isBatchAllowed() to return a wrong answer
this.shutdownRunningTask = null;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Mon Jan 30 08:39:25 2012
@@ -47,6 +47,7 @@ import org.apache.camel.impl.LoggingExce
import org.apache.camel.spi.AggregationRepository;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.RecoverableAggregationRepository;
+import org.apache.camel.spi.ShutdownPrepared;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultTimeoutMap;
import org.apache.camel.support.ServiceSupport;
@@ -74,7 +75,7 @@ import org.slf4j.LoggerFactory;
* and older prices are discarded). Another idea is to combine line item messages
* together into a single invoice message.
*/
-public class AggregateProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable {
+public class AggregateProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable, ShutdownPrepared {
public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker";
@@ -879,15 +880,9 @@ public class AggregateProcessor extends
@Override
protected void doStop() throws Exception {
-
- if (forceCompletionOnStop) {
- forceCompletionOfAllGroups();
-
- while (inProgressCompleteExchanges.size() > 0) {
- LOG.trace("waiting for {} in progress exchanges to complete", inProgressCompleteExchanges.size());
- Thread.sleep(100);
- }
- }
+ // note: we cannot do doForceCompletionOnStop from this doStop method
+ // as this is handled in the prepareShutdown method which is also invoked when stopping a route
+ // and is better suited for preparing to shutdown than this doStop method is
if (recoverService != null) {
camelContext.getExecutorServiceManager().shutdownNow(recoverService);
@@ -904,6 +899,37 @@ public class AggregateProcessor extends
}
@Override
+ public void prepareShutdown(boolean forced) {
+ // we are shutting down, so force completion if this option was enabled
+ // but only do this when forced=false, as that is when we have chance to
+ // send out new messages to be routed by Camel. When forced=true, then
+ // we have to shutdown in a hurry
+ if (!forced && forceCompletionOnStop) {
+ doForceCompletionOnStop();
+ }
+ }
+
+ private void doForceCompletionOnStop() {
+ int expected = forceCompletionOfAllGroups();
+
+ StopWatch watch = new StopWatch();
+ while (inProgressCompleteExchanges.size() > 0) {
+ LOG.trace("Waiting for {} inflight exchanges to complete", inProgressCompleteExchanges.size());
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // break out as we got interrupted such as the JVM terminating
+ LOG.warn("Interrupted while waiting for {} inflight exchanges to complete.", inProgressCompleteExchanges.size());
+ break;
+ }
+ }
+
+ if (expected > 0) {
+ LOG.info("Forcing completion of all groups with {} exchanges completed in {}", expected, TimeUtils.printDuration(watch.stop()));
+ }
+ }
+
+ @Override
protected void doShutdown() throws Exception {
// shutdown aggregation repository
ServiceHelper.stopService(aggregationRepository);
@@ -914,12 +940,13 @@ public class AggregateProcessor extends
super.doShutdown();
}
- public void forceCompletionOfAllGroups() {
+ public int forceCompletionOfAllGroups() {
- // only run if CamelContext has been fully started
- if (!camelContext.getStatus().isStarted()) {
+ // only run if CamelContext has been fully started or is stopping
+ boolean allow = camelContext.getStatus().isStarted() || camelContext.getStatus().isStopping();
+ if (!allow) {
LOG.warn("cannot start force completion because CamelContext({}) has not been started yet", camelContext.getName());
- return;
+ return 0;
}
LOG.trace("Starting force completion of all groups task");
@@ -927,9 +954,11 @@ public class AggregateProcessor extends
// trigger completion for all in the repository
Set<String> keys = aggregationRepository.getKeys();
+ int total = 0;
if (keys != null && !keys.isEmpty()) {
// must acquire the shared aggregation lock to be able to trigger force completion
lock.lock();
+ total = keys.size();
try {
for (String key : keys) {
Exchange exchange = aggregationRepository.get(camelContext, key);
@@ -944,7 +973,12 @@ public class AggregateProcessor extends
lock.unlock();
}
}
-
LOG.trace("Completed force completion of all groups task");
+
+ if (total > 0) {
+ LOG.debug("Forcing completion of all groups with {} exchanges", total);
+ }
+ return total;
}
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java Mon Jan 30 08:39:25 2012
@@ -319,7 +319,7 @@ public class DefaultChannel extends Serv
// determine if we can still run, or the camel context is forcing a shutdown
boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this);
if (forceShutdown) {
- LOG.trace("Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: {}", exchange);
+ LOG.debug("Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: {}", exchange);
if (exchange.getException() == null) {
exchange.setException(new RejectedExecutionException());
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteStartupOrder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteStartupOrder.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteStartupOrder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteStartupOrder.java Mon Jan 30 08:39:25 2012
@@ -20,6 +20,7 @@ import java.util.List;
import org.apache.camel.Consumer;
import org.apache.camel.Route;
+import org.apache.camel.Service;
/**
* Information about a route to be started where we want to control the order
@@ -53,4 +54,11 @@ public interface RouteStartupOrder {
*/
List<Consumer> getInputs();
+ /**
+ * Gets the services to this route.
+ *
+ * @return the services.
+ */
+ List<Service> getServices();
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java Mon Jan 30 08:39:25 2012
@@ -27,7 +27,7 @@ import org.apache.camel.ShutdownRunningT
* @version
* @see org.apache.camel.spi.ShutdownStrategy
*/
-public interface ShutdownAware {
+public interface ShutdownAware extends ShutdownPrepared {
/**
* To defer shutdown during first phase of shutdown. This allows any pending exchanges to be completed
@@ -53,10 +53,4 @@ public interface ShutdownAware {
*/
int getPendingExchangesSize();
- /**
- * Prepares the consumer for shutdown.
- * <p/>
- * For example by graceful stopping any threads or the likes.
- */
- void prepareShutdown();
}
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java?rev=1237570&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java Mon Jan 30 08:39:25 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+/**
+ * Allows a {@link org.apache.camel.Service} to prepare for shutdown.
+ * <p/>
+ * <b>Important: </b> Implementators of this interface must be a {@link org.apache.camel.Service} as well.
+ * <p/>
+ * This allows {@link org.apache.camel.Processor}s to prepare for shutdown, such as when
+ * {@link org.apache.camel.CamelContext} or a {@link org.apache.camel.Route} is shutting down.
+ * The {@link org.apache.camel.Processor} could be a stateful EIP such as the
+ * {@link org.apache.camel.processor.aggregate.AggregateProcessor}, allowing it to do custom work
+ * to prepare for shutdown.
+ */
+public interface ShutdownPrepared {
+
+ /**
+ * Prepares for shutdown.
+ * <p/>
+ * The {@link ShutdownStrategy} supports preparing for shutdown using two steps.
+ * First a regular preparation, where the given forced parameter will be <tt>false</tt>.
+ * And if the shutdown times out, then the {@link ShutdownStrategy} performs a more aggressive
+ * shutdown, calling this method a second time with <tt>true</tt> for the given forced parameter.
+ * <p/>
+ * For example by graceful stopping any threads or the likes.
+ * <p/>
+ * For forced shutdown, then the service is expected to aggressively shutdown any child services, such
+ * as thread pools etc. This is the last chance it has to perform such duties.
+ *
+ * @param forced <tt>true</tt> is forcing a more aggressive shutdown, <tt>false</tt> is for preparing to shutdown.
+ */
+ void prepareShutdown(boolean forced);
+
+}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java Mon Jan 30 08:39:25 2012
@@ -18,8 +18,11 @@ package org.apache.camel.util;
import java.util.Arrays;
import java.util.Collection;
+import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Set;
+import org.apache.camel.Navigate;
import org.apache.camel.Service;
import org.apache.camel.ShutdownableService;
import org.apache.camel.StatefulService;
@@ -341,4 +344,31 @@ public final class ServiceHelper {
return false;
}
+ /**
+ * Gather all child services by navigating the service to recursively gather all child services.
+ *
+ * @param service the service
+ * @return the services, including the parent service, and all its children
+ */
+ public static Set<Service> getChildServices(Service service) {
+ Set<Service> answer = new LinkedHashSet<Service>();
+ doGetChildServices(answer, service);
+ return answer;
+ }
+
+ private static void doGetChildServices(Set<Service> services, Service service) {
+ services.add(service);
+ if (service instanceof Navigate) {
+ Navigate<?> nav = (Navigate<?>) service;
+ if (nav.hasNext()) {
+ List<?> children = nav.next();
+ for (Object child : children) {
+ if (child instanceof Service) {
+ doGetChildServices(services, (Service) child);
+ }
+ }
+ }
+ }
+ }
+
}
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopParallelTest.java (from r1237241, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopParallelTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopParallelTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java&r1=1237241&r2=1237570&rev=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopParallelTest.java Mon Jan 30 08:39:25 2012
@@ -16,67 +16,28 @@
*/
package org.apache.camel.processor.aggregator;
-import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.BodyInAggregatingStrategy;
/**
* @version
*/
-public class AggregateForceCompletionOnStopTest extends ContextTestSupport {
-
- // TODO: Need CAMEL-4953 to fix me
-
- MyCompletionProcessor myCompletionProcessor = new MyCompletionProcessor();
-
- public void testFixMe() throws Exception {
- // TODO: remove me
- }
-
- public void xxxTestForceCompletionTrue() throws Exception {
- myCompletionProcessor.reset();
- context.getShutdownStrategy().setShutdownNowOnTimeout(true);
- context.getShutdownStrategy().setTimeout(5);
-
- template.sendBodyAndHeader("direct:forceCompletionTrue", "test1", "id", "1");
- template.sendBodyAndHeader("direct:forceCompletionTrue", "test2", "id", "2");
- template.sendBodyAndHeader("direct:forceCompletionTrue", "test3", "id", "1");
- template.sendBodyAndHeader("direct:forceCompletionTrue", "test4", "id", "2");
- assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
- context.stop();
- assertEquals("aggregation should have completed", 2, myCompletionProcessor.getAggregationCount());
- }
-
- public void xxxTestForceCompletionFalse() throws Exception {
- myCompletionProcessor.reset();
- context.getShutdownStrategy().setShutdownNowOnTimeout(true);
- context.getShutdownStrategy().setTimeout(5);
-
- template.sendBodyAndHeader("direct:forceCompletionFalse", "test1", "id", "1");
- template.sendBodyAndHeader("direct:forceCompletionFalse", "test2", "id", "2");
- template.sendBodyAndHeader("direct:forceCompletionFalse", "test3", "id", "1");
- template.sendBodyAndHeader("direct:forceCompletionFalse", "test4", "id", "2");
- assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
- context.stop();
- assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
- }
+public class AggregateForceCompletionOnStopParallelTest extends AggregateForceCompletionOnStopTest {
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
-
@Override
public void configure() throws Exception {
-
- from("direct:forceCompletionTrue")
- .aggregate(header("id"), new BodyInAggregatingStrategy()).forceCompletionOnStop().completionSize(10)
+ from("direct:forceCompletionTrue").routeId("foo")
+ .aggregate(header("id"), new BodyInAggregatingStrategy()).forceCompletionOnStop().completionSize(10).parallelProcessing()
.delay(100)
- .process(myCompletionProcessor);
+ .processRef("myCompletionProcessor");
- from("direct:forceCompletionFalse")
- .aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(10)
+ from("direct:forceCompletionFalse").routeId("bar")
+ .aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(10).parallelProcessing()
.delay(100)
- .process(myCompletionProcessor);
+ .processRef("myCompletionProcessor");
}
};
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java Mon Jan 30 08:39:25 2012
@@ -18,6 +18,7 @@ package org.apache.camel.processor.aggre
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.processor.BodyInAggregatingStrategy;
/**
@@ -25,16 +26,44 @@ import org.apache.camel.processor.BodyIn
*/
public class AggregateForceCompletionOnStopTest extends ContextTestSupport {
- // TODO: Need CAMEL-4953 to fix me
+ public void testForceCompletionTrue() throws Exception {
+ MyCompletionProcessor myCompletionProcessor = context.getRegistry().lookup("myCompletionProcessor", MyCompletionProcessor.class);
+ myCompletionProcessor.reset();
- MyCompletionProcessor myCompletionProcessor = new MyCompletionProcessor();
+ context.getShutdownStrategy().setShutdownNowOnTimeout(true);
+ context.getShutdownStrategy().setTimeout(5);
+
+ template.sendBodyAndHeader("direct:forceCompletionTrue", "test1", "id", "1");
+ template.sendBodyAndHeader("direct:forceCompletionTrue", "test2", "id", "2");
+ template.sendBodyAndHeader("direct:forceCompletionTrue", "test3", "id", "1");
+ template.sendBodyAndHeader("direct:forceCompletionTrue", "test4", "id", "2");
- public void testFixMe() throws Exception {
- // TODO: remove me
+ assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
+ context.stop();
+ assertEquals("aggregation should have completed", 2, myCompletionProcessor.getAggregationCount());
}
- public void xxxTestForceCompletionTrue() throws Exception {
+ public void testForceCompletionFalse() throws Exception {
+ MyCompletionProcessor myCompletionProcessor = context.getRegistry().lookup("myCompletionProcessor", MyCompletionProcessor.class);
myCompletionProcessor.reset();
+
+ context.getShutdownStrategy().setShutdownNowOnTimeout(true);
+ context.getShutdownStrategy().setTimeout(5);
+
+ template.sendBodyAndHeader("direct:forceCompletionFalse", "test1", "id", "1");
+ template.sendBodyAndHeader("direct:forceCompletionFalse", "test2", "id", "2");
+ template.sendBodyAndHeader("direct:forceCompletionFalse", "test3", "id", "1");
+ template.sendBodyAndHeader("direct:forceCompletionFalse", "test4", "id", "2");
+
+ assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
+ context.stop();
+ assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
+ }
+
+ public void testStopRouteForceCompletionTrue() throws Exception {
+ MyCompletionProcessor myCompletionProcessor = context.getRegistry().lookup("myCompletionProcessor", MyCompletionProcessor.class);
+ myCompletionProcessor.reset();
+
context.getShutdownStrategy().setShutdownNowOnTimeout(true);
context.getShutdownStrategy().setTimeout(5);
@@ -42,13 +71,17 @@ public class AggregateForceCompletionOnS
template.sendBodyAndHeader("direct:forceCompletionTrue", "test2", "id", "2");
template.sendBodyAndHeader("direct:forceCompletionTrue", "test3", "id", "1");
template.sendBodyAndHeader("direct:forceCompletionTrue", "test4", "id", "2");
+
assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
- context.stop();
+ // stopping a route should also force the completion
+ context.stopRoute("foo");
assertEquals("aggregation should have completed", 2, myCompletionProcessor.getAggregationCount());
}
- public void xxxTestForceCompletionFalse() throws Exception {
+ public void testStopRouteForceCompletionFalse() throws Exception {
+ MyCompletionProcessor myCompletionProcessor = context.getRegistry().lookup("myCompletionProcessor", MyCompletionProcessor.class);
myCompletionProcessor.reset();
+
context.getShutdownStrategy().setShutdownNowOnTimeout(true);
context.getShutdownStrategy().setTimeout(5);
@@ -56,27 +89,33 @@ public class AggregateForceCompletionOnS
template.sendBodyAndHeader("direct:forceCompletionFalse", "test2", "id", "2");
template.sendBodyAndHeader("direct:forceCompletionFalse", "test3", "id", "1");
template.sendBodyAndHeader("direct:forceCompletionFalse", "test4", "id", "2");
+
assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
- context.stop();
+ context.stopRoute("bar");
assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
}
@Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+ jndi.bind("myCompletionProcessor", new MyCompletionProcessor());
+ return jndi;
+ }
+
+ @Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
-
@Override
public void configure() throws Exception {
-
- from("direct:forceCompletionTrue")
+ from("direct:forceCompletionTrue").routeId("foo")
.aggregate(header("id"), new BodyInAggregatingStrategy()).forceCompletionOnStop().completionSize(10)
.delay(100)
- .process(myCompletionProcessor);
+ .processRef("myCompletionProcessor");
- from("direct:forceCompletionFalse")
+ from("direct:forceCompletionFalse").routeId("bar")
.aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(10)
.delay(100)
- .process(myCompletionProcessor);
+ .processRef("myCompletionProcessor");
}
};
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java Mon Jan 30 08:39:25 2012
@@ -16,22 +16,24 @@
*/
package org.apache.camel.processor.aggregator;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
public class MyCompletionProcessor implements Processor {
- private static int aggregationCount;
+ private final AtomicInteger aggregationCount = new AtomicInteger();
public int getAggregationCount() {
- return aggregationCount;
+ return aggregationCount.get();
}
@Override
public void process(Exchange exchange) throws Exception {
- aggregationCount++;
+ aggregationCount.incrementAndGet();
}
public void reset() {
- aggregationCount = 0;
+ aggregationCount.set(0);
}
}
Modified: camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java (original)
+++ camel/trunk/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java Mon Jan 30 08:39:25 2012
@@ -103,7 +103,8 @@ public class KestrelConsumer extends Def
return pendingExchangeCount.get();
}
- public void prepareShutdown() {
+ @Override
+ public void prepareShutdown(boolean forced) {
// Signal to our threads that shutdown is happening
shutdownPending = true;
Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java Mon Jan 30 08:39:25 2012
@@ -95,7 +95,8 @@ public class RouteboxDirectConsumer exte
return 0;
}
- public void prepareShutdown() {
+ @Override
+ public void prepareShutdown(boolean forced) {
// noop
}
Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java Mon Jan 30 08:39:25 2012
@@ -122,8 +122,9 @@ public class RouteboxSedaConsumer extend
// TODO: Get size of queue
return 0;
}
-
- public void prepareShutdown() {
+
+ @Override
+ public void prepareShutdown(boolean forced) {
}
public void setProcessor(AsyncProcessor processor) {
Modified: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml?rev=1237570&r1=1237569&r2=1237570&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml Mon Jan 30 08:39:25 2012
@@ -23,18 +23,20 @@
">
<camelContext xmlns="http://camel.apache.org/schema/spring">
- <route>
+ <route id="foo">
<from uri="direct:forceCompletionTrue"/>
<aggregate strategyRef="aggregatorStrategy" forceCompletionOnStop="true" completionSize="10">
<correlationExpression><header>id</header></correlationExpression>
+ <delay><constant>100</constant></delay>
<process ref="myCompletionProcessor"/>
</aggregate>
</route>
- <route>
+ <route id="bar">
<from uri="direct:forceCompletionFalse"/>
<aggregate strategyRef="aggregatorStrategy" completionSize="10">
<correlationExpression><header>id</header></correlationExpression>
+ <delay><constant>100</constant></delay>
<process ref="myCompletionProcessor"/>
</aggregate>
</route>