You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/30 10:54:25 UTC
svn commit: r980685 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/impl/ main/java/org/apache/camel/spi/
test/java/org/apache/camel/management/ test/java/org/apache/camel/processor/
Author: davsclaus
Date: Fri Jul 30 08:54:25 2010
New Revision: 980685
URL: http://svn.apache.org/viewvc?rev=980685&view=rev
Log:
CAMEL-3012: CamelContext supports suspend/resume to temporary stop a Camel application.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeNoAutoStartupTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeTest.java
- copied, changed from r980647, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierEventsTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=980685&r1=980684&r2=980685&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Fri Jul 30 08:54:25 2010
@@ -478,8 +478,11 @@ public class DefaultCamelContext extends
/**
* Returns the order in which the route inputs was started.
+ * <p/>
+ * The order may not be according to the startupOrder defined on the route.
+ * For example a route could be started manually later, or new routes added at runtime.
*
- * @return a list ordered by the starting order of the route inputs
+ * @return a list in the order how routes was started
*/
public List<RouteStartupOrder> getRouteStartupOrder() {
return routeStartupOrder;
@@ -1003,13 +1006,19 @@ public class DefaultCamelContext extends
}
}
- // suspend routes using the shutdown strategy so it can shutdown in correct order
- // TODO: leverage shutdown strategy to let it run in suspend mode, so it can suspend routes in correct order
+ // assemble list of startup ordering so routes can be shutdown accordingly
+ List<RouteStartupOrder> orders = new ArrayList<RouteStartupOrder>();
for (Map.Entry<String, RouteService> entry : suspendedRouteServices.entrySet()) {
- shutdownRoute(entry.getKey());
+ Route route = entry.getValue().getRoutes().iterator().next();
+ Integer order = entry.getValue().getRouteDefinition().getStartupOrder();
+ if (order == null) {
+ order = defaultRouteStartupOrder++;
+ }
+ orders.add(new DefaultRouteStartupOrder(order, route, entry.getValue()));
}
- // TODO: more unit test to ensure suspend/resume with startup ordering is as expected
+ // suspend routes using the shutdown strategy so it can shutdown in correct order
+ getShutdownStrategy().suspend(this, orders);
watch.stop();
if (LOG.isInfoEnabled()) {
@@ -1094,7 +1103,21 @@ public class DefaultCamelContext extends
// filter out already started routes
Map<String, RouteService> filtered = new LinkedHashMap<String, RouteService>();
for (Map.Entry<String, RouteService> entry : routeServices.entrySet()) {
- if (entry.getValue().getStatus().isStartable()) {
+ boolean startable;
+
+ Consumer consumer = entry.getValue().getRoutes().iterator().next().getConsumer();
+ if (consumer instanceof SuspendableService) {
+ // consumer could be suspended, which is not reflected in the RouteService status
+ startable = ((SuspendableService) consumer).isSuspended();
+ } else if (consumer instanceof ServiceSupport) {
+ // consumer could be stopped, which is not reflected in the RouteService status
+ startable = ((ServiceSupport) consumer).getStatus().isStartable();
+ } else {
+ // no consumer so use state from route service
+ startable = entry.getValue().getStatus().isStartable();
+ }
+
+ if (startable) {
filtered.put(entry.getKey(), entry.getValue());
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=980685&r1=980684&r2=980685&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Fri Jul 30 08:54:25 2010
@@ -18,6 +18,7 @@ package org.apache.camel.impl;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -83,11 +84,28 @@ public class DefaultShutdownStrategy ext
shutdown(context, routes, getTimeout(), getTimeUnit());
}
+ public void suspend(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
+ doShutdown(context, routes, getTimeout(), getTimeUnit(), true);
+ }
+
public void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception {
+ doShutdown(context, routes, timeout, timeUnit, false);
+ }
+
+ public void suspend(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception {
+ doShutdown(context, routes, timeout, timeUnit, true);
+ }
+
+ protected void doShutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit, boolean suspendOnly) throws Exception {
StopWatch watch = new StopWatch();
- // should the order of routes be reversed?
+ // at first sort according to route startup order
List<RouteStartupOrder> routesOrdered = new ArrayList<RouteStartupOrder>(routes);
+ Collections.sort(routesOrdered, new Comparator<RouteStartupOrder>() {
+ public int compare(RouteStartupOrder o1, RouteStartupOrder o2) {
+ return o1.getStartupOrder() - o2.getStartupOrder();
+ }
+ });
if (shutdownRoutesInReverseOrder) {
Collections.reverse(routesOrdered);
}
@@ -99,7 +117,7 @@ public class DefaultShutdownStrategy ext
}
// use another thread to perform the shutdowns so we can support timeout
- Future future = getExecutorService().submit(new ShutdownTask(context, routesOrdered));
+ Future future = getExecutorService().submit(new ShutdownTask(context, routesOrdered, suspendOnly));
try {
if (timeout > 0) {
future.get(timeout, timeUnit);
@@ -227,18 +245,18 @@ public class DefaultShutdownStrategy ext
}
/**
- * Suspends the consumer immediately.
+ * Suspends/stops the consumer immediately.
*
- * @param service the suspendable consumer
* @param consumer the consumer to suspend
*/
- protected void suspendNow(SuspendableService service, Consumer consumer) {
+ protected void suspendNow(Consumer consumer) {
if (LOG.isTraceEnabled()) {
LOG.trace("Suspending: " + consumer);
}
+ // allow us to do custom work before delegating to service helper
try {
- service.suspend();
+ ServiceHelper.suspendService(consumer);
} catch (Throwable e) {
LOG.warn("Error occurred while suspending route: " + consumer + ". This exception will be ignored.", e);
// fire event
@@ -301,10 +319,12 @@ public class DefaultShutdownStrategy ext
private final CamelContext context;
private final List<RouteStartupOrder> routes;
+ private final boolean suspendOnly;
- public ShutdownTask(CamelContext context, List<RouteStartupOrder> routes) {
+ public ShutdownTask(CamelContext context, List<RouteStartupOrder> routes, boolean suspendOnly) {
this.context = context;
this.routes = routes;
+ this.suspendOnly = suspendOnly;
}
public void run() {
@@ -316,7 +336,7 @@ public class DefaultShutdownStrategy ext
// 3) shutdown the deferred routes
if (LOG.isDebugEnabled()) {
- LOG.debug("There are " + routes.size() + " routes to shutdown");
+ LOG.debug("There are " + routes.size() + " routes to " + (suspendOnly ? "suspend" : "shutdown"));
}
// list of deferred consumers to shutdown when all exchanges has been completed routed
@@ -329,7 +349,8 @@ public class DefaultShutdownStrategy ext
ShutdownRunningTask shutdownRunningTask = order.getRoute().getRouteContext().getShutdownRunningTask();
if (LOG.isTraceEnabled()) {
- LOG.trace("Shutting down route: " + order.getRoute().getId() + " with options [" + shutdownRoute + "," + shutdownRunningTask + "]");
+ LOG.trace((suspendOnly ? "Suspending route: " : "Shutting down route: ") + order.getRoute().getId()
+ + " with options [" + shutdownRoute + "," + shutdownRunningTask + "]");
}
for (Consumer consumer : order.getInputs()) {
@@ -356,7 +377,7 @@ public class DefaultShutdownStrategy ext
if (suspend) {
// only suspend it and then later shutdown it
- suspendNow((SuspendableService) consumer, consumer);
+ suspendNow(consumer);
// add it to the deferred list so the route will be shutdown later
deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
LOG.info("Route: " + order.getRoute().getId() + " suspended and shutdown deferred, was consuming from: "
@@ -369,7 +390,7 @@ public class DefaultShutdownStrategy ext
// we will stop it later, but for now it must run to be able to help all inflight messages
// be safely completed
deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
- LOG.info("Route: " + order.getRoute().getId() + " shutdown deferred.");
+ LOG.info("Route: " + order.getRoute().getId() + (suspendOnly ? " shutdown deferred." : " suspension deferred."));
}
}
}
@@ -413,8 +434,13 @@ public class DefaultShutdownStrategy ext
// now all messages has been completed then stop the deferred consumers
for (ShutdownDeferredConsumer deferred : deferredConsumers) {
- shutdownNow(deferred.getConsumer());
- LOG.info("Route: " + deferred.getRoute().getId() + " shutdown complete.");
+ if (suspendOnly) {
+ suspendNow(deferred.getConsumer());
+ LOG.info("Route: " + deferred.getRoute().getId() + " suspend complete.");
+ } else {
+ shutdownNow(deferred.getConsumer());
+ LOG.info("Route: " + deferred.getRoute().getId() + " shutdown complete.");
+ }
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=980685&r1=980684&r2=980685&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Fri Jul 30 08:54:25 2010
@@ -147,7 +147,9 @@ public interface ExecutorServiceStrategy
* @param source the source object, usually it should be <tt>this</tt> passed in as parameter
* @param name name which is appended to the thread name
* @return the created thread pool
+ * @deprecated using cached thread pool is discouraged as they have no upper bound and can overload the JVM
*/
+ @Deprecated
ExecutorService newCachedThreadPool(Object source, String name);
/**
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java?rev=980685&r1=980684&r2=980685&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java Fri Jul 30 08:54:25 2010
@@ -50,6 +50,15 @@ public interface ShutdownStrategy extend
void shutdown(CamelContext context, List<RouteStartupOrder> routes) throws Exception;
/**
+ * Suspends the routes
+ *
+ * @param context the camel context
+ * @param routes the routes, ordered by the order they was started
+ * @throws Exception is thrown if error suspending the consumers, however its preferred to avoid this
+ */
+ void suspend(CamelContext context, List<RouteStartupOrder> routes) throws Exception;
+
+ /**
* Shutdown the routes using a specified timeout instead of the default timeout values
*
* @param context the camel context
@@ -61,6 +70,17 @@ public interface ShutdownStrategy extend
void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception;
/**
+ * Suspends the routes using a specified timeout instead of the default timeout values
+ *
+ * @param context the camel context
+ * @param routes the routes, ordered by the order they was started
+ * @param timeout timeout
+ * @param timeUnit the unit to use
+ * @throws Exception is thrown if error suspending the consumers, however its preferred to avoid this
+ */
+ void suspend(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception;
+
+ /**
* Set an timeout to wait for the shutdown to complete.
* <p/>
* Setting a value of 0 or negative will disable timeout and wait until complete
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierEventsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierEventsTest.java?rev=980685&r1=980684&r2=980685&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierEventsTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierEventsTest.java Fri Jul 30 08:54:25 2010
@@ -141,19 +141,16 @@ public class EventNotifierEventsTest ext
context.suspend();
- assertEquals(8, events.size());
+ assertEquals(6, events.size());
assertIsInstanceOf(CamelContextSuspendingEvent.class, events.get(4));
- assertIsInstanceOf(RouteStoppedEvent.class, events.get(5));
- assertIsInstanceOf(RouteStoppedEvent.class, events.get(6));
- assertIsInstanceOf(CamelContextSuspendedEvent.class, events.get(7));
+ // notice direct component is not suspended (as they are internal)
+ assertIsInstanceOf(CamelContextSuspendedEvent.class, events.get(5));
context.resume();
- assertEquals(12, events.size());
- assertIsInstanceOf(CamelContextResumingEvent.class, events.get(8));
- assertIsInstanceOf(RouteStartedEvent.class, events.get(9));
- assertIsInstanceOf(RouteStartedEvent.class, events.get(10));
- assertIsInstanceOf(CamelContextResumedEvent.class, events.get(11));
+ assertEquals(8, events.size());
+ assertIsInstanceOf(CamelContextResumingEvent.class, events.get(6));
+ assertIsInstanceOf(CamelContextResumedEvent.class, events.get(7));
}
@Override
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeNoAutoStartupTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeNoAutoStartupTest.java?rev=980685&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeNoAutoStartupTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeNoAutoStartupTest.java Fri Jul 30 08:54:25 2010
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.spi.RouteStartupOrder;
+
+/**
+ * @version $Revision: 893963 $
+ */
+public class RouteStartupOrderSuspendResumeNoAutoStartupTest extends ContextTestSupport {
+
+ public void testRouteStartupOrderSuspendResumeNoAutoStartup() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ context.suspend();
+ context.resume();
+
+ // route C should still be stopped after we have resumed
+ assertEquals(true, context.getRouteStatus("C").isStopped());
+
+ // assert correct order
+ DefaultCamelContext dcc = (DefaultCamelContext) context;
+ List<RouteStartupOrder> order = dcc.getRouteStartupOrder();
+
+ assertEquals(3, order.size());
+ assertEquals("seda://foo", order.get(0).getRoute().getEndpoint().getEndpointUri());
+ assertEquals("direct://start", order.get(1).getRoute().getEndpoint().getEndpointUri());
+ assertEquals("direct://bar", order.get(2).getRoute().getEndpoint().getEndpointUri());
+ }
+
+ public void testRouteStartupOrderSuspendResumeStartC() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ // start C
+ context.startRoute("C");
+
+ context.suspend();
+ context.resume();
+
+ // route C should be started
+ assertEquals(true, context.getRouteStatus("C").isStarted());
+
+ // assert correct order
+ DefaultCamelContext dcc = (DefaultCamelContext) context;
+ List<RouteStartupOrder> order = dcc.getRouteStartupOrder();
+
+ assertEquals(4, order.size());
+ assertEquals("seda://foo", order.get(0).getRoute().getEndpoint().getEndpointUri());
+ assertEquals("direct://start", order.get(1).getRoute().getEndpoint().getEndpointUri());
+ assertEquals("direct://bar", order.get(2).getRoute().getEndpoint().getEndpointUri());
+
+ // however its started manually so its started after the auto started
+ assertEquals("seda://bar", order.get(3).getRoute().getEndpoint().getEndpointUri());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").routeId("B").startupOrder(2).to("seda:foo");
+
+ from("seda:foo").routeId("A").startupOrder(1).to("mock:result");
+
+ from("direct:bar").routeId("D").startupOrder(9).to("seda:bar");
+
+ from("seda:bar").routeId("C").noAutoStartup().startupOrder(5).to("mock:other");
+ }
+ };
+ }
+}
\ No newline at end of file
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeTest.java (from r980647, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderTest.java&r1=980647&r2=980685&rev=980685&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStartupOrderSuspendResumeTest.java Fri Jul 30 08:54:25 2010
@@ -27,9 +27,9 @@ import org.apache.camel.spi.RouteStartup
/**
* @version $Revision$
*/
-public class RouteStartupOrderTest extends ContextTestSupport {
+public class RouteStartupOrderSuspendResumeTest extends ContextTestSupport {
- public void testRouteStartupOrder() throws Exception {
+ public void testRouteStartupOrderSuspendResume() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(1);
@@ -37,6 +37,9 @@ public class RouteStartupOrderTest exten
assertMockEndpointsSatisfied();
+ context.suspend();
+ context.resume();
+
// assert correct order
DefaultCamelContext dcc = (DefaultCamelContext) context;
List<RouteStartupOrder> order = dcc.getRouteStartupOrder();
@@ -53,13 +56,13 @@ public class RouteStartupOrderTest exten
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:start").startupOrder(2).to("seda:foo");
+ from("direct:start").routeId("B").startupOrder(2).to("seda:foo");
- from("seda:foo").startupOrder(1).to("mock:result");
+ from("seda:foo").routeId("A").startupOrder(1).to("mock:result");
- from("direct:bar").startupOrder(9).to("seda:bar");
+ from("direct:bar").routeId("D").startupOrder(9).to("seda:bar");
- from("seda:bar").startupOrder(5).to("mock:other");
+ from("seda:bar").routeId("C").startupOrder(5).to("mock:other");
}
};
}