You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/12/26 13:00:31 UTC
svn commit: r893963 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/component/file/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/management/ camel-core...
Author: davsclaus
Date: Sat Dec 26 12:00:30 2009
New Revision: 893963
URL: http://svn.apache.org/viewvc?rev=893963&view=rev
Log:
CAMEL-1483: Added fine grained control for graceful shutdown (work in progress). Fixed spelling.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/ShutdownRoute.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/ShutdownRunningTask.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java (contents, props changed)
- copied, changed from r893726, camel/trunk/camel-core/src/main/java/org/apache/camel/impl/StartupRouteHolder.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteStartupOrder.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesTwoRouteShutdownGracefulTest.java
- copied, changed from r893766, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownNotDeferTest.java (with props)
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringShutdownDeferTest.java (contents, props changed)
- copied, changed from r893766, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAOPBeforeTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringShutdownNotDeferTest.java (with props)
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownDeferTest.xml (contents, props changed)
- copied, changed from r893766, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aopbefore.xml
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownNotDeferTest.xml (with props)
Removed:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/StartupRouteHolder.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/RuntimeConfiguration.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
camel/trunk/camel-core/src/main/resources/org/apache/camel/jaxb.index
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSimpleTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderWithDefaultTest.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/JmsDiscoveryTest.java
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/RuntimeConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/RuntimeConfiguration.java?rev=893963&r1=893962&r2=893963&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/RuntimeConfiguration.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/RuntimeConfiguration.java Sat Dec 26 12:00:30 2009
@@ -107,4 +107,36 @@
*/
boolean isAutoStartup();
+ /**
+ * Sets the option to use when shutting down routes.
+ *
+ * @param shutdownRoute the option to use.
+ */
+ void setShutdownRoute(ShutdownRoute shutdownRoute);
+
+ /**
+ * Gets the option to use when shutting down route.
+ *
+ * @return the option
+ */
+ ShutdownRoute getShutdownRoute();
+
+ /**
+ * Sets the option to use when shutting down a route and how to act when it has running tasks.
+ * <p/>
+ * A running task is for example a {@link org.apache.camel.BatchConsumer} which has a group
+ * of messages to process. With this option you can control whether it should complete the entire
+ * group or stop after the current message has been processed.
+ *
+ * @param shutdownRunningTask the option to use.
+ */
+ void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask);
+
+ /**
+ * Gets the option to use when shutting down a route and how to act when it has running tasks.
+ *
+ * @return the option
+ */
+ ShutdownRunningTask getShutdownRunningTask();
+
}
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/ShutdownRoute.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ShutdownRoute.java?rev=893963&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ShutdownRoute.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ShutdownRoute.java Sat Dec 26 12:00:30 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import javax.xml.bind.annotation.XmlEnum;
+import javax.xml.bind.annotation.XmlType;
+
+/**
+ * Represent the kinds of options for shutting down.
+ * <p/>
+ * Is used for example to defer shutting down a route until all inflight exchanges has
+ * been completed, by which the route safely can be shutdown.
+ * <p/>
+ * This allows fine grained configuration to accomplish graceful shutdown and you
+ * for example have some internal route which other routes is dependent upon.
+ * <ul>
+ * <li>Default - The <b>default</b> behavior where a route will either be attempted to shutdown now</li>
+ * <li>Defer - Will defer shutting down the route and let it be active during graceful shutdown.
+ * The route will at a later stage be shutdown during the graceful shutdown process.</li>
+ * </ul>
+ */
+@XmlType
+@XmlEnum(String.class)
+public enum ShutdownRoute {
+
+ Default, Defer
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ShutdownRoute.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ShutdownRoute.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/ShutdownRunningTask.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ShutdownRunningTask.java?rev=893963&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ShutdownRunningTask.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ShutdownRunningTask.java Sat Dec 26 12:00:30 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import javax.xml.bind.annotation.XmlEnum;
+import javax.xml.bind.annotation.XmlType;
+
+/**
+ * Represents the kind of options for what to do with the current task when shutting down.
+ * <p/>
+ * By default the current task is allowed to complete. However some consumers such as
+ * {@link BatchConsumer} has pending tasks which you can configure to
+ * let it complete all those tasks as well.
+ * <ul>
+ * <li>CompleteCurrentTaskOnly - Is the <b>default</b> behavior where a route consumer will be shutdown as fast as
+ * possible. Allowing it to complete its current task, but not to pickup pending
+ * tasks (if any).</li>
+ * <li>CompleteAllTasks - Allows a route consumer to do <i>do its business</i> which means that it will continue
+ * to complete any pending tasks (if any).</li>
+ * </ul>
+ * <b>Notice:</b> Most consumers only have a single tasks, but {@link org.apache.camel.BatchConsumer} can have
+ * many tasks and thus this option mostly apply to these kind of consumers.
+ */
+@XmlType
+@XmlEnum(String.class)
+public enum ShutdownRunningTask {
+
+ CompleteCurrentTaskOnly, CompleteAllTasks
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ShutdownRunningTask.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ShutdownRunningTask.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=893963&r1=893962&r2=893963&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Sat Dec 26 12:00:30 2009
@@ -161,7 +161,7 @@
protected void processExchange(final Exchange exchange) {
GenericFile<T> file = getExchangeFileProperty(exchange);
if (log.isTraceEnabled()) {
- log.trace("Processing remote file: " + file);
+ log.trace("Processing file: " + file);
}
try {
@@ -185,7 +185,7 @@
// retrieve the file using the stream
if (log.isTraceEnabled()) {
- log.trace("Retreiving file: " + name + " from: " + endpoint);
+ log.trace("Retrieving file: " + name + " from: " + endpoint);
}
operations.retrieveFile(name, exchange);
@@ -198,7 +198,7 @@
log.debug("About to process file: " + target + " using exchange: " + exchange);
}
- // register on completion callback that does the completiom stategies
+ // register on completion callback that does the completion strategies
// (for instance to move the file after we have processed it)
String originalFileName = file.getFileName();
exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, originalFileName));
@@ -216,7 +216,7 @@
* not
*
* @param file the remote file
- * @param isDirectory wether the file is a directory or a file
+ * @param isDirectory whether the file is a directory or a file
* @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
*/
@SuppressWarnings("unchecked")
@@ -249,7 +249,7 @@
* And then <tt>true</tt> for directories.
*
* @param file the file
- * @param isDirectory wether the file is a directory or a file
+ * @param isDirectory whether the file is a directory or a file
* @return <tt>true</tt> if the remote file is matched, <tt>false</tt> if not
*/
protected boolean isMatched(GenericFile<T> file, boolean isDirectory) {
@@ -290,7 +290,7 @@
// use file expression for a simple dynamic file filter
if (endpoint.getFileName() != null) {
- evaluteFileExpression();
+ evaluateFileExpression();
if (fileExpressionResult != null) {
if (!name.equals(fileExpressionResult)) {
return false;
@@ -312,7 +312,7 @@
return !endpoint.getInProgressRepository().add(key);
}
- private void evaluteFileExpression() {
+ private void evaluateFileExpression() {
if (fileExpressionResult == null) {
// create a dummy exchange as Exchange is needed for expression evaluation
Exchange dummy = new DefaultExchange(endpoint.getCamelContext());
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=893963&r1=893962&r2=893963&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Sat Dec 26 12:00:30 2009
@@ -47,6 +47,8 @@
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.Service;
import org.apache.camel.ServiceStatus;
+import org.apache.camel.ShutdownRoute;
+import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.TypeConverter;
import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.impl.converter.DefaultTypeConverter;
@@ -79,6 +81,7 @@
import org.apache.camel.spi.PackageScanClassResolver;
import org.apache.camel.spi.Registry;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.RouteStartupOrder;
import org.apache.camel.spi.ServicePool;
import org.apache.camel.spi.ShutdownStrategy;
import org.apache.camel.spi.TypeConverterRegistry;
@@ -146,9 +149,11 @@
private NodeIdFactory nodeIdFactory = new DefaultNodeIdFactory();
private Tracer defaultTracer;
private InflightRepository inflightRepository = new DefaultInflightRepository();
- private final List<Consumer> routeStartupOrder = new ArrayList<Consumer>();
+ private final List<RouteStartupOrder> routeStartupOrder = new ArrayList<RouteStartupOrder>();
private int defaultRouteStartupOrder = 1000;
private ShutdownStrategy shutdownStrategy = new DefaultShutdownStrategy();
+ private ShutdownRoute shutdownRoute = ShutdownRoute.Default;
+ private ShutdownRunningTask shutdownRunningTask = ShutdownRunningTask.CompleteCurrentTaskOnly;
public DefaultCamelContext() {
super();
@@ -499,7 +504,7 @@
*
* @return a list ordered by the starting order of the route inputs
*/
- public List<Consumer> getRouteStartupOrder() {
+ public List<RouteStartupOrder> getRouteStartupOrder() {
return routeStartupOrder;
}
@@ -892,7 +897,7 @@
synchronized (this) {
// list of inputs to start when all the routes have been prepared for starting
// we use a tree map so the routes will be ordered according to startup order defined on the route
- Map<Integer, StartupRouteHolder> inputs = new TreeMap<Integer, StartupRouteHolder>();
+ Map<Integer, DefaultRouteStartupOrder> inputs = new TreeMap<Integer, DefaultRouteStartupOrder>();
// figure out the order in which the routes should be started
for (RouteService routeService : routeServices.values()) {
@@ -915,17 +920,17 @@
}
// create holder object that contains information about this route to be started
- StartupRouteHolder holder = null;
+ DefaultRouteStartupOrder holder = null;
for (Map.Entry<Route, Consumer> entry : routeService.getInputs().entrySet()) {
if (holder == null) {
- holder = new StartupRouteHolder(startupOrder, entry.getKey());
+ holder = new DefaultRouteStartupOrder(startupOrder, entry.getKey());
}
// add the input consumer to the holder
holder.addInput(entry.getValue());
}
// check for clash by startupOrder id
- StartupRouteHolder other = inputs.get(startupOrder);
+ DefaultRouteStartupOrder other = inputs.get(startupOrder);
if (other != null) {
String otherId = other.getRoute().getId();
throw new FailedToStartRouteException(holder.getRoute().getId(), "starupOrder clash. Route " + otherId + " already has startupOrder "
@@ -970,13 +975,13 @@
// now start the inputs for all the route services as we have prepared Camel
// yeah open the floods so messages can start flow into Camel
- for (Map.Entry<Integer, StartupRouteHolder> entry : inputs.entrySet()) {
+ for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) {
Integer order = entry.getKey();
Route route = entry.getValue().getRoute();
List<Consumer> consumers = entry.getValue().getInputs();
for (Consumer consumer : consumers) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Starting consumer (order: " + order + ") on route: " + route.getId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting consumer (order: " + order + ") on route: " + route.getId());
}
for (LifecycleStrategy strategy : lifecycleStrategies) {
strategy.onServiceAdd(this, consumer, route);
@@ -984,7 +989,7 @@
ServiceHelper.startService(consumer);
// add to the order which they was started, so we know how to stop them in reverse order
- routeStartupOrder.add(consumer);
+ routeStartupOrder.add(entry.getValue());
}
}
}
@@ -996,6 +1001,8 @@
LOG.debug("... Routes started");
}
+ LOG.info("Started " + getRoutes().size() + " routes");
+
LOG.info("Apache Camel " + getVersion() + " (CamelContext:" + getName() + ") started");
EventHelper.notifyCamelContextStarted(this);
}
@@ -1428,6 +1435,22 @@
this.shutdownStrategy = shutdownStrategy;
}
+ public ShutdownRoute getShutdownRoute() {
+ return shutdownRoute;
+ }
+
+ public void setShutdownRoute(ShutdownRoute shutdownRoute) {
+ this.shutdownRoute = shutdownRoute;
+ }
+
+ public ShutdownRunningTask getShutdownRunningTask() {
+ return shutdownRunningTask;
+ }
+
+ public void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask) {
+ this.shutdownRunningTask = shutdownRunningTask;
+ }
+
protected String getEndpointKey(String uri, Endpoint endpoint) {
if (endpoint.isSingleton()) {
return uri;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java?rev=893963&r1=893962&r2=893963&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java Sat Dec 26 12:00:30 2009
@@ -28,6 +28,8 @@
import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.Processor;
import org.apache.camel.Route;
+import org.apache.camel.ShutdownRoute;
+import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.management.InstrumentationProcessor;
import org.apache.camel.model.FromDefinition;
import org.apache.camel.model.ProcessorDefinition;
@@ -62,6 +64,8 @@
private Long delay;
private Boolean autoStartup = Boolean.TRUE;
private RoutePolicy routePolicy;
+ private ShutdownRoute shutdownRoute;
+ private ShutdownRunningTask shutdownRunningTask;
public DefaultRouteContext(CamelContext camelContext, RouteDefinition route, FromDefinition from, Collection<Route> routes) {
this.camelContext = camelContext;
@@ -282,6 +286,32 @@
return true;
}
+ public void setShutdownRoute(ShutdownRoute shutdownRoute) {
+ this.shutdownRoute = shutdownRoute;
+ }
+
+ public ShutdownRoute getShutdownRoute() {
+ if (shutdownRoute != null) {
+ return shutdownRoute;
+ } else {
+ // fallback to the option from camel context
+ return getCamelContext().getShutdownRoute();
+ }
+ }
+
+ public void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask) {
+ this.shutdownRunningTask = shutdownRunningTask;
+ }
+
+ public ShutdownRunningTask getShutdownRunningTask() {
+ if (shutdownRunningTask != null) {
+ return shutdownRunningTask;
+ } else {
+ // fallback to the option from camel context
+ return getCamelContext().getShutdownRunningTask();
+ }
+ }
+
public RoutePolicy getRoutePolicy() {
return routePolicy;
}
Copied: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java (from r893726, camel/trunk/camel-core/src/main/java/org/apache/camel/impl/StartupRouteHolder.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/impl/StartupRouteHolder.java&r1=893726&r2=893963&rev=893963&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/StartupRouteHolder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java Sat Dec 26 12:00:30 2009
@@ -21,6 +21,7 @@
import org.apache.camel.Consumer;
import org.apache.camel.Route;
+import org.apache.camel.spi.RouteStartupOrder;
/**
* Information about a route to be started where we want to control the order
@@ -28,13 +29,13 @@
*
* @version $Revision$
*/
-class StartupRouteHolder {
+public class DefaultRouteStartupOrder implements RouteStartupOrder {
private final int startupOrder;
private final Route route;
private final List<Consumer> inputs = new ArrayList<Consumer>();
- StartupRouteHolder(int startupOrder, Route route) {
+ public DefaultRouteStartupOrder(int startupOrder, Route route) {
this.startupOrder = startupOrder;
this.route = route;
}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteStartupOrder.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=893963&r1=893962&r2=893963&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Sat Dec 26 12:00:30 2009
@@ -26,7 +26,11 @@
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
+import org.apache.camel.Route;
+import org.apache.camel.ShutdownRoute;
+import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.SuspendableService;
+import org.apache.camel.spi.RouteStartupOrder;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.spi.ShutdownStrategy;
import org.apache.camel.util.EventHelper;
@@ -46,7 +50,7 @@
* for a long time, and hence why a timeout value can be set. When the timeout triggers you can also
* specify whether the remainder consumers should be shutdown now or ignore.
* <p/>
- * Will by default use a timeout of 5 minutes by which it will shutdown now the remaining consumers.
+ * Will by default use a timeout of 300 seconds (5 minutes) by which it will shutdown now the remaining consumers.
* This ensures that when shutting down Camel it at some point eventually will shutdown.
* This behavior can of course be configured using the {@link #setTimeout(long)} and
* {@link #setShutdownNowOnTimeout(boolean)} methods.
@@ -61,7 +65,7 @@
private TimeUnit timeUnit = TimeUnit.SECONDS;
private boolean shutdownNowOnTimeout = true;
- public void shutdown(CamelContext context, List<Consumer> consumers) throws Exception {
+ public void shutdown(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
long start = System.currentTimeMillis();
@@ -72,7 +76,7 @@
}
// use another thread to perform the shutdowns so we can support timeout
- Future future = getExecutorService().submit(new ShutdownTask(context, consumers));
+ Future future = getExecutorService().submit(new ShutdownTask(context, routes));
try {
if (timeout > 0) {
future.get(timeout, timeUnit);
@@ -85,8 +89,8 @@
if (shutdownNowOnTimeout) {
LOG.warn("Timeout occurred. Now forcing all routes to be shutdown now.");
- // force the consumers to shutdown now
- shutdownNow(consumers);
+ // force the routes to shutdown now
+ shutdownRoutesNow(routes);
} else {
LOG.warn("Timeout occurred. Will ignore shutting down the remainder route input consumers.");
}
@@ -129,6 +133,29 @@
/**
* Shutdown all the consumers immediately.
*
+ * @param routes the routes to shutdown
+ */
+ protected void shutdownRoutesNow(List<RouteStartupOrder> routes) {
+ for (RouteStartupOrder order : routes) {
+
+ // set the route to shutdown as fast as possible by stopping after
+ // it has completed its current task
+ ShutdownRunningTask current = order.getRoute().getRouteContext().getShutdownRunningTask();
+ if (current != ShutdownRunningTask.CompleteCurrentTaskOnly) {
+ LOG.info("Changing shutdownRunningTask from " + current + " to " + ShutdownRunningTask.CompleteCurrentTaskOnly
+ + " on route " + order.getRoute().getId() + " to shutdown faster");
+ order.getRoute().getRouteContext().setShutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly);
+ }
+
+ for (Consumer consumer : order.getInputs()) {
+ shutdownNow(consumer);
+ }
+ }
+ }
+
+ /**
+ * Shutdown all the consumers immediately.
+ *
* @param consumers the consumers to shutdown
*/
protected void shutdownNow(List<Consumer> consumers) {
@@ -204,70 +231,125 @@
executor = null;
}
+ class ShutdownDeferredConsumer {
+ private final Route route;
+ private final Consumer consumer;
+
+ ShutdownDeferredConsumer(Route route, Consumer consumer) {
+ this.route = route;
+ this.consumer = consumer;
+ }
+
+ Route getRoute() {
+ return route;
+ }
+
+ Consumer getConsumer() {
+ return consumer;
+ }
+ }
+
/**
* Shutdown task which shutdown all the routes in a graceful manner.
*/
class ShutdownTask implements Runnable {
private final CamelContext context;
- private final List<Consumer> consumers;
+ private final List<RouteStartupOrder> routes;
- public ShutdownTask(CamelContext context, List<Consumer> consumers) {
+ public ShutdownTask(CamelContext context, List<RouteStartupOrder> routes) {
this.context = context;
- this.consumers = consumers;
+ this.routes = routes;
}
public void run() {
+ // the strategy in this run method is to
+ // 1) go over the routes and shutdown those routes which can be shutdown asap
+ // some routes will be deferred to shutdown at the end, as they are needed
+ // by other routes so they can complete their tasks
+ // 2) wait until all inflight and pending exchanges has been completed
+ // 3) shutdown the deferred routes
+
if (LOG.isDebugEnabled()) {
- LOG.debug("There are " + consumers.size() + " routes to shutdown");
+ LOG.debug("There are " + routes.size() + " routes to shutdown");
}
// list of deferred consumers to shutdown when all exchanges has been completed routed
// and thus there are no more inflight exchanges so they can be safely shutdown at that time
- List<Consumer> deferredConsumers = new ArrayList<Consumer>();
+ List<ShutdownDeferredConsumer> deferredConsumers = new ArrayList<ShutdownDeferredConsumer>();
+
+ for (RouteStartupOrder order : routes) {
- for (Consumer consumer : consumers) {
+ ShutdownRoute shutdownRoute = order.getRoute().getRouteContext().getShutdownRoute();
+ ShutdownRunningTask shutdownRunningTask = order.getRoute().getRouteContext().getShutdownRunningTask();
- // some consumers do not support shutting down so let them decide
- // if a consumer is suspendable then prefer to use that and then shutdown later
- boolean shutdown = true;
- boolean suspend = false;
- if (consumer instanceof ShutdownAware) {
- shutdown = ((ShutdownAware) consumer).deferShutdown();
- } else if (consumer instanceof SuspendableService) {
- shutdown = false;
- suspend = true;
+ // TODO: shutdownRunningTask should be implemented in various consumers
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Shutting down route: " + order.getRoute().getId() + " with options [" + shutdownRoute + "," + shutdownRunningTask + "]");
}
- if (suspend) {
- // only suspend it and then later shutdown it
- suspendNow((SuspendableService) consumer, consumer);
- // add it to the deferred list so the route will be shutdown later
- deferredConsumers.add(consumer);
- } else if (shutdown) {
- shutdownNow(consumer);
- } else {
- // we will stop it later, but for now it must run to be able to help all inflight messages
- // be safely completed
- deferredConsumers.add(consumer);
+ for (Consumer consumer : order.getInputs()) {
+
+ boolean suspend = false;
+
+ // assume we should shutdown if we are not deferred
+ boolean shutdown = shutdownRoute != ShutdownRoute.Defer;
+
+ if (shutdown) {
+ // if we are to shutdown then check whether we can suspend instead as its a more
+ // gentle wat to graceful shutdown
+
+ // some consumers do not support shutting down so let them decide
+ // if a consumer is suspendable then prefer to use that and then shutdown later
+ if (consumer instanceof ShutdownAware) {
+ shutdown = ((ShutdownAware) consumer).deferShutdown();
+ } else if (consumer instanceof SuspendableService) {
+ suspend = true;
+ }
+ }
+
+ if (suspend) {
+ // only suspend it and then later shutdown it
+ suspendNow((SuspendableService) consumer, consumer);
+ // add it to the deferred list so the route will be shutdown later
+ deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
+ LOG.info("Route: " + order.getRoute().getId() + " suspended and shutdown deferred.");
+ } else if (shutdown) {
+ shutdownNow(consumer);
+ LOG.info("Route: " + order.getRoute().getId() + " shutdown complete.");
+ } else {
+ // we will stop it later, but for now it must run to be able to help all inflight messages
+ // be safely completed
+ deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
+ LOG.info("Route: " + order.getRoute().getId() + " shutdown deferred.");
+ }
}
}
- // wait till there are no more pending inflight messages
+ // wait till there are no more pending and inflight messages
boolean done = false;
while (!done) {
int size = 0;
- for (Consumer consumer : consumers) {
- size += context.getInflightRepository().size(consumer.getEndpoint());
- // include any additional pending exchanges on some consumers which may have internal
- // memory queues such as seda
- if (consumer instanceof ShutdownAware) {
- size += ((ShutdownAware) consumer).getPendingExchangesSize();
+ for (RouteStartupOrder order : routes) {
+ for (Consumer consumer : order.getInputs()) {
+ int inflight = context.getInflightRepository().size(consumer.getEndpoint());
+ // include any additional pending exchanges on some consumers which may have internal
+ // memory queues such as seda
+ if (consumer instanceof ShutdownAware) {
+ inflight += ((ShutdownAware) consumer).getPendingExchangesSize();
+ }
+ if (inflight > 0) {
+ size += inflight;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(inflight + " inflight and pending exchanges for consumer: " + consumer);
+ }
+ }
}
}
if (size > 0) {
try {
- LOG.info("Waiting as there are still " + size + " inflight exchanges to complete before we can shutdown");
+ LOG.info("Waiting as there are still " + size + " inflight and pending exchanges to complete before we can shutdown");
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting during graceful shutdown, will force shutdown now.");
@@ -280,7 +362,10 @@
}
// now all messages has been completed then stop the deferred consumers
- shutdownNow(deferredConsumers);
+ for (ShutdownDeferredConsumer deferred : deferredConsumers) {
+ shutdownNow(deferred.getConsumer());
+ LOG.info("Route: " + deferred.getRoute().getId() + " shutdown complete.");
+ }
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java?rev=893963&r1=893962&r2=893963&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementAgent.java Sat Dec 26 12:00:30 2009
@@ -374,7 +374,7 @@
try {
LocateRegistry.createRegistry(registryPort);
if (LOG.isDebugEnabled()) {
- LOG.debug("Created JMXConnector RMI regisry on port " + registryPort);
+ LOG.debug("Created JMXConnector RMI registry on port " + registryPort);
}
} catch (RemoteException ex) {
// The registry may had been created, we could get the registry instead
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java?rev=893963&r1=893962&r2=893963&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java Sat Dec 26 12:00:30 2009
@@ -34,6 +34,8 @@
import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.Route;
import org.apache.camel.ServiceStatus;
+import org.apache.camel.ShutdownRoute;
+import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.builder.ErrorHandlerBuilderRef;
import org.apache.camel.builder.RouteBuilder;
@@ -67,6 +69,8 @@
private Integer startupOrder;
private RoutePolicy routePolicy;
private String routePolicyRef;
+ private ShutdownRoute shutdownRoute;
+ private ShutdownRunningTask shutdownRunningTask;
public RouteDefinition() {
}
@@ -396,6 +400,26 @@
return this;
}
+ /**
+ * Configures a shutdown route option.
+ *
+ * @param shutdownRoute the option to use when shutting down this route
+ */
+ public RouteDefinition shutdownRoute(ShutdownRoute shutdownRoute) {
+ setShutdownRoute(shutdownRoute);
+ return this;
+ }
+
+ /**
+ * Configures a shutdown running task option.
+ *
+ * @param shutdownRunningTask the option to use when shutting down and how to act upon running tasks.
+ */
+ public RouteDefinition shutdownRunningTask(ShutdownRunningTask shutdownRunningTask) {
+ setShutdownRunningTask(shutdownRunningTask);
+ return this;
+ }
+
// Properties
// -----------------------------------------------------------------------
@@ -543,6 +567,24 @@
return routePolicy;
}
+ public ShutdownRoute getShutdownRoute() {
+ return shutdownRoute;
+ }
+
+ @XmlAttribute
+ public void setShutdownRoute(ShutdownRoute shutdownRoute) {
+ this.shutdownRoute = shutdownRoute;
+ }
+
+ public ShutdownRunningTask getShutdownRunningTask() {
+ return shutdownRunningTask;
+ }
+
+ @XmlAttribute
+ public void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask) {
+ this.shutdownRunningTask = shutdownRunningTask;
+ }
+
// Implementation methods
// -------------------------------------------------------------------------
protected RouteContext addRoutes(Collection<Route> routes, FromDefinition fromType) throws Exception {
@@ -624,6 +666,14 @@
routeContext.setAutoStartup(isAutoStartup());
}
+ // configure shutdown
+ if (shutdownRoute != null) {
+ routeContext.setShutdownRoute(getShutdownRoute());
+ }
+ if (shutdownRunningTask != null) {
+ routeContext.setShutdownRunningTask(getShutdownRunningTask());
+ }
+
// should inherit the intercept strategies we have defined
routeContext.setInterceptStrategies(this.getInterceptStrategies());
// force endpoint resolution
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteStartupOrder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteStartupOrder.java?rev=893963&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteStartupOrder.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteStartupOrder.java Sat Dec 26 12:00:30 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.List;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Route;
+
+/**
+ * A holder to link a {@link Route} to
+ *
+ * @version $Revision$
+ */
+public interface RouteStartupOrder {
+
+ Route getRoute();
+
+ List<Consumer> getInputs();
+
+ int getStartupOrder();
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteStartupOrder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteStartupOrder.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java?rev=893963&r1=893962&r2=893963&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java Sat Dec 26 12:00:30 2009
@@ -20,7 +20,6 @@
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
-import org.apache.camel.Consumer;
import org.apache.camel.Service;
/**
@@ -44,11 +43,11 @@
/**
* Shutdown the routes
*
- * @param context the camel context
- * @param consumers the consumers for the routes, ordered by the order they was started
+ * @param context the camel context
+ * @param routes the routes, ordered by the order they was started
* @throws Exception is thrown if error shutting down the consumers, however its preferred to avoid this
*/
- void shutdown(CamelContext context, List<Consumer> consumers) throws Exception;
+ void shutdown(CamelContext context, List<RouteStartupOrder> routes) throws Exception;
/**
* Set an timeout to wait for the shutdown to complete.
@@ -86,6 +85,9 @@
/**
* Sets whether to force shutdown of all consumers when a timeout occurred and thus
* not all consumers was shutdown within that period.
+ * <p/>
+ * You should have good reasons to set this option to <tt>false</tt> as it means that the routes
+ * keep running and is halted abruptly when {@link CamelContext} has been shutdown.
*
* @param shutdownNowOnTimeout <tt>true</tt> to force shutdown, <tt>false</tt> to leave them running
*/
Modified: camel/trunk/camel-core/src/main/resources/org/apache/camel/jaxb.index
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/resources/org/apache/camel/jaxb.index?rev=893963&r1=893962&r2=893963&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/resources/org/apache/camel/jaxb.index (original)
+++ camel/trunk/camel-core/src/main/resources/org/apache/camel/jaxb.index Sat Dec 26 12:00:30 2009
@@ -16,4 +16,6 @@
## ------------------------------------------------------------------------
ExchangePattern
LoggingLevel
-ManagementStatisticsLevel
\ No newline at end of file
+ManagementStatisticsLevel
+ShutdownRoute
+ShutdownRunningTask
\ No newline at end of file
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesTwoRouteShutdownGracefulTest.java (from r893766, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesTwoRouteShutdownGracefulTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesTwoRouteShutdownGracefulTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java&r1=893766&r2=893963&rev=893963&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesTwoRouteShutdownGracefulTest.java Sat Dec 26 12:00:30 2009
@@ -24,12 +24,14 @@
/**
* @version $Revision$
*/
-public class PendingExchangesShutdownGracefulTest extends ContextTestSupport {
+public class PendingExchangesTwoRouteShutdownGracefulTest extends ContextTestSupport {
private static String foo = "";
+ private static String bar = "";
public void testShutdownGraceful() throws Exception {
getMockEndpoint("mock:foo").expectedMessageCount(1);
+ getMockEndpoint("mock:bar").expectedMessageCount(1);
template.sendBody("seda:foo", "A");
template.sendBody("seda:foo", "B");
@@ -37,14 +39,22 @@
template.sendBody("seda:foo", "D");
template.sendBody("seda:foo", "E");
+ template.sendBody("seda:bar", "A");
+ template.sendBody("seda:bar", "B");
+ template.sendBody("seda:bar", "C");
+ template.sendBody("seda:bar", "D");
+ template.sendBody("seda:bar", "E");
+
assertMockEndpointsSatisfied();
// now stop the route before its complete
foo = foo + "stop";
+ bar = bar + "stop";
context.stop();
// it should wait as there was 1 inflight exchange and 4 pending messages left
assertEquals("Should graceful shutdown", "stopABCDE", foo);
+ assertEquals("Should graceful shutdown", "stopABCDE", bar);
}
@Override
@@ -57,6 +67,12 @@
foo = foo + exchange.getIn().getBody(String.class);
}
});
+
+ from("seda:bar").to("mock:bar").delay(500).process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ bar = bar + exchange.getIn().getBody(String.class);
+ }
+ });
}
};
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSimpleTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSimpleTest.java?rev=893963&r1=893962&r2=893963&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSimpleTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSimpleTest.java Sat Dec 26 12:00:30 2009
@@ -18,11 +18,11 @@
import java.util.List;
-import org.apache.camel.Consumer;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.spi.RouteStartupOrder;
/**
* @version $Revision$
@@ -39,11 +39,11 @@
// assert correct order
DefaultCamelContext dcc = (DefaultCamelContext) context;
- List<Consumer> order = dcc.getRouteStartupOrder();
+ List<RouteStartupOrder> order = dcc.getRouteStartupOrder();
assertEquals(2, order.size());
- assertEquals("direct://start", order.get(0).getEndpoint().getEndpointUri());
- assertEquals("seda://foo", order.get(1).getEndpoint().getEndpointUri());
+ assertEquals("direct://start", order.get(0).getRoute().getEndpoint().getEndpointUri());
+ assertEquals("seda://foo", order.get(1).getRoute().getEndpoint().getEndpointUri());
}
@Override
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderTest.java?rev=893963&r1=893962&r2=893963&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderTest.java Sat Dec 26 12:00:30 2009
@@ -18,11 +18,11 @@
import java.util.List;
-import org.apache.camel.Consumer;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.spi.RouteStartupOrder;
/**
* @version $Revision$
@@ -39,13 +39,13 @@
// assert correct order
DefaultCamelContext dcc = (DefaultCamelContext) context;
- List<Consumer> order = dcc.getRouteStartupOrder();
+ List<RouteStartupOrder> order = dcc.getRouteStartupOrder();
assertEquals(4, order.size());
- assertEquals("seda://foo", order.get(0).getEndpoint().getEndpointUri());
- assertEquals("direct://start", order.get(1).getEndpoint().getEndpointUri());
- assertEquals("seda://bar", order.get(2).getEndpoint().getEndpointUri());
- assertEquals("direct://bar", order.get(3).getEndpoint().getEndpointUri());
+ assertEquals("seda://foo", order.get(0).getRoute().getEndpoint().getEndpointUri());
+ assertEquals("direct://start", order.get(1).getRoute().getEndpoint().getEndpointUri());
+ assertEquals("seda://bar", order.get(2).getRoute().getEndpoint().getEndpointUri());
+ assertEquals("direct://bar", order.get(3).getRoute().getEndpoint().getEndpointUri());
}
@Override
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderWithDefaultTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderWithDefaultTest.java?rev=893963&r1=893962&r2=893963&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderWithDefaultTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderWithDefaultTest.java Sat Dec 26 12:00:30 2009
@@ -18,11 +18,11 @@
import java.util.List;
-import org.apache.camel.Consumer;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.spi.RouteStartupOrder;
/**
* @version $Revision$
@@ -39,15 +39,15 @@
// assert correct order
DefaultCamelContext dcc = (DefaultCamelContext) context;
- List<Consumer> order = dcc.getRouteStartupOrder();
+ List<RouteStartupOrder> order = dcc.getRouteStartupOrder();
assertEquals(5, order.size());
- assertEquals("seda://foo", order.get(0).getEndpoint().getEndpointUri());
- assertEquals("direct://start", order.get(1).getEndpoint().getEndpointUri());
- assertEquals("seda://bar", order.get(2).getEndpoint().getEndpointUri());
- assertEquals("direct://bar", order.get(3).getEndpoint().getEndpointUri());
+ assertEquals("seda://foo", order.get(0).getRoute().getEndpoint().getEndpointUri());
+ assertEquals("direct://start", order.get(1).getRoute().getEndpoint().getEndpointUri());
+ assertEquals("seda://bar", order.get(2).getRoute().getEndpoint().getEndpointUri());
+ assertEquals("direct://bar", order.get(3).getRoute().getEndpoint().getEndpointUri());
// the one with no startup order should be last
- assertEquals("direct://default", order.get(4).getEndpoint().getEndpointUri());
+ assertEquals("direct://default", order.get(4).getRoute().getEndpoint().getEndpointUri());
}
@Override
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java?rev=893963&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java Sat Dec 26 12:00:30 2009
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+import static org.apache.camel.ShutdownRoute.Defer;
+
+/**
+ * @version $Revision$
+ */
+public class ShutdownDeferTest extends ContextTestSupport {
+
+ public void testShutdownDeferred() throws Exception {
+ deleteDirectory("target/deferred");
+
+ MockEndpoint bar = getMockEndpoint("mock:bar");
+ bar.expectedMinimumMessageCount(1);
+ bar.setResultWaitTime(3000);
+
+ template.sendBody("seda:foo", "A");
+ template.sendBody("seda:foo", "B");
+ template.sendBody("seda:foo", "C");
+ template.sendBody("seda:foo", "D");
+ template.sendBody("seda:foo", "E");
+ template.sendBody("seda:foo", "F");
+ template.sendBody("seda:foo", "G");
+ template.sendBody("seda:foo", "H");
+
+ assertMockEndpointsSatisfied();
+
+ context.stop();
+
+ // should route all 8
+ assertEquals("Should complete all 8 messages", 8, bar.getReceivedCounter());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ // START SNIPPET: e1
+ public void configure() throws Exception {
+ from("seda:foo")
+ .startupOrder(1)
+ .delay(1000).to("file://target/deferred");
+
+ // use file component to transfer files from route 1 -> route 2 as it
+ // will normally suspend, but by deferring this we can let route 1
+ // complete while shutting down
+ from("file://target/deferred")
+ // defer shutting down this route as the 1st route depends upon it
+ .startupOrder(2).shutdownRoute(Defer)
+ .to("mock:bar");
+ }
+ // END SNIPPET: e1
+ };
+ }
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownNotDeferTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownNotDeferTest.java?rev=893963&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownNotDeferTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownNotDeferTest.java Sat Dec 26 12:00:30 2009
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+import static org.apache.camel.ShutdownRoute.Default;
+
+/**
+ * @version $Revision$
+ */
+public class ShutdownNotDeferTest extends ContextTestSupport {
+
+ public void testShutdownNotDeferred() throws Exception {
+ deleteDirectory("target/deferred");
+
+ MockEndpoint bar = getMockEndpoint("mock:bar");
+ bar.expectedMinimumMessageCount(1);
+ bar.setResultWaitTime(3000);
+
+ template.sendBody("seda:foo", "A");
+ template.sendBody("seda:foo", "B");
+ template.sendBody("seda:foo", "C");
+ template.sendBody("seda:foo", "D");
+ template.sendBody("seda:foo", "E");
+ template.sendBody("seda:foo", "F");
+ template.sendBody("seda:foo", "G");
+ template.sendBody("seda:foo", "H");
+
+ assertMockEndpointsSatisfied();
+
+ context.stop();
+
+ // should route all 8
+ assertTrue("Should NOT complete all 8 messages, was " + bar.getReceivedCounter(), bar.getReceivedCounter() < 8);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:foo")
+ .startupOrder(1)
+ .delay(1000).to("file://target/deferred");
+
+ // use file component to transfer files from route 1 -> route 2
+ from("file://target/deferred")
+ // do NOT defer it but use default for testing this
+ .startupOrder(2).shutdownRoute(Default)
+ .to("mock:bar");
+ }
+ };
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownNotDeferTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownNotDeferTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/JmsDiscoveryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/JmsDiscoveryTest.java?rev=893963&r1=893962&r2=893963&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/JmsDiscoveryTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/JmsDiscoveryTest.java Sat Dec 26 12:00:30 2009
@@ -23,6 +23,7 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
+import org.apache.camel.ShutdownRoute;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
@@ -75,7 +76,11 @@
// lets setup the heartbeats
from("bean:service1?method=status").to("activemq:topic:registry.heartbeats");
- from("activemq:topic:registry.heartbeats").to("bean:registry?method=onEvent", "mock:result");
+ // defer shutting this route down as the first route depends upon it to
+ // be running so it can complete its current exchanges
+ from("activemq:topic:registry.heartbeats")
+ .shutdownRoute(ShutdownRoute.Defer)
+ .to("bean:registry?method=onEvent", "mock:result");
}
};
}
Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java?rev=893963&r1=893962&r2=893963&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java Sat Dec 26 12:00:30 2009
@@ -30,6 +30,8 @@
import org.apache.camel.CamelException;
import org.apache.camel.RoutesBuilder;
+import org.apache.camel.ShutdownRoute;
+import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.builder.ErrorHandlerBuilderRef;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.management.DefaultManagementAgent;
@@ -109,6 +111,10 @@
private String errorHandlerRef;
@XmlAttribute(required = false)
private Boolean autoStartup = Boolean.TRUE;
+ @XmlAttribute(required = false)
+ private ShutdownRoute shutdownRoute;
+ @XmlAttribute(required = false)
+ private ShutdownRunningTask shutdownRunningTask;
@XmlElement(name = "properties", required = false)
private PropertiesDefinition properties;
@XmlElement(name = "package", required = false)
@@ -787,6 +793,22 @@
this.autoStartup = autoStartup;
}
+ public ShutdownRoute getShutdownRoute() {
+ return shutdownRoute;
+ }
+
+ public void setShutdownRoute(ShutdownRoute shutdownRoute) {
+ this.shutdownRoute = shutdownRoute;
+ }
+
+ public ShutdownRunningTask getShutdownRunningTask() {
+ return shutdownRunningTask;
+ }
+
+ public void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask) {
+ this.shutdownRunningTask = shutdownRunningTask;
+ }
+
// Implementation methods
// -------------------------------------------------------------------------
@@ -797,22 +819,28 @@
SpringCamelContext ctx = new SpringCamelContext(getApplicationContext());
ctx.setName(getId());
if (streamCache != null) {
- ctx.setStreamCaching(streamCache);
+ ctx.setStreamCaching(getStreamCache());
}
if (trace != null) {
- ctx.setTracing(trace);
+ ctx.setTracing(getTrace());
}
if (delayer != null) {
- ctx.setDelayer(delayer);
+ ctx.setDelayer(getDelayer());
}
if (handleFault != null) {
- ctx.setHandleFault(handleFault);
+ ctx.setHandleFault(getHandleFault());
}
if (errorHandlerRef != null) {
- ctx.setErrorHandlerBuilder(new ErrorHandlerBuilderRef(errorHandlerRef));
+ ctx.setErrorHandlerBuilder(new ErrorHandlerBuilderRef(getErrorHandlerRef()));
}
if (autoStartup != null) {
- ctx.setAutoStartup(autoStartup);
+ ctx.setAutoStartup(isAutoStartup());
+ }
+ if (shutdownRoute != null) {
+ ctx.setShutdownRoute(getShutdownRoute());
+ }
+ if (shutdownRunningTask != null) {
+ ctx.setShutdownRunningTask(getShutdownRunningTask());
}
return ctx;
}
Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringShutdownDeferTest.java (from r893766, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAOPBeforeTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringShutdownDeferTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringShutdownDeferTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAOPBeforeTest.java&r1=893766&r2=893963&rev=893963&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAOPBeforeTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringShutdownDeferTest.java Sat Dec 26 12:00:30 2009
@@ -17,15 +17,16 @@
package org.apache.camel.spring.processor;
import org.apache.camel.CamelContext;
-import org.apache.camel.processor.AOPBeforeTest;
+import org.apache.camel.processor.ShutdownDeferTest;
+
import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
/**
* @version $Revision$
*/
-public class SpringAOPBeforeTest extends AOPBeforeTest {
+public class SpringShutdownDeferTest extends ShutdownDeferTest {
protected CamelContext createCamelContext() throws Exception {
- return createSpringCamelContext(this, "org/apache/camel/spring/processor/aopbefore.xml");
+ return createSpringCamelContext(this, "org/apache/camel/spring/processor/ShutdownDeferTest.xml");
}
}
\ No newline at end of file
Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringShutdownDeferTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringShutdownDeferTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringShutdownNotDeferTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringShutdownNotDeferTest.java?rev=893963&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringShutdownNotDeferTest.java (added)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringShutdownNotDeferTest.java Sat Dec 26 12:00:30 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.ShutdownNotDeferTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+/**
+ * @version $Revision$
+ */
+public class SpringShutdownNotDeferTest extends ShutdownNotDeferTest {
+
+ protected CamelContext createCamelContext() throws Exception {
+ return createSpringCamelContext(this, "org/apache/camel/spring/processor/ShutdownNotDeferTest.xml");
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringShutdownNotDeferTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringShutdownNotDeferTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownDeferTest.xml (from r893766, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aopbefore.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownDeferTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownDeferTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aopbefore.xml&r1=893766&r2=893963&rev=893963&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aopbefore.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownDeferTest.xml Sat Dec 26 12:00:30 2009
@@ -24,12 +24,16 @@
<!-- START SNIPPET: e1 -->
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
- <route>
- <from uri="direct:start"/>
- <aop beforeUri="mock:before">
- <transform><constant>Bye World</constant></transform>
- <to uri="mock:result"/>
- </aop>
+ <route startupOrder="1">
+ <from uri="seda:foo"/>
+ <delay><constant>1000</constant></delay>
+ <to uri="file://target/deferred"/>
+ </route>
+
+ <!-- defer shutting down this route as the first route is depend upon it -->
+ <route startupOrder="2" shutdownRoute="Defer">
+ <from uri="file://target/deferred"/>
+ <to uri="mock:bar"/>
</route>
</camelContext>
<!-- END SNIPPET: e1 -->
Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownDeferTest.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownDeferTest.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownDeferTest.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml
Added: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownNotDeferTest.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownNotDeferTest.xml?rev=893963&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownNotDeferTest.xml (added)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownNotDeferTest.xml Sat Dec 26 12:00:30 2009
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+ ">
+
+ <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+ <route startupOrder="1">
+ <from uri="seda:foo"/>
+ <delay><constant>1000</constant></delay>
+ <to uri="file://target/deferred"/>
+ </route>
+
+ <route startupOrder="2" shutdownRoute="Default">
+ <from uri="file://target/deferred"/>
+ <to uri="mock:bar"/>
+ </route>
+ </camelContext>
+
+</beans>
Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownNotDeferTest.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownNotDeferTest.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownNotDeferTest.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml