You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/01/04 15:48:03 UTC
svn commit: r1227175 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/impl/ main/java/org/apache/camel/processor/
main/java/org/apache/camel/util/ test/java/org/apache/camel/management/
Author: davsclaus
Date: Wed Jan 4 14:48:02 2012
New Revision: 1227175
URL: http://svn.apache.org/viewvc?rev=1227175&view=rev
Log:
CAMEL-4842: Removing route should remove producer cache from JMX, as well from services to close list on CamelContext, to not eat up memory. Also shutdown route scoped error handlers to not leak memory, when removing a route.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=1227175&r1=1227174&r2=1227175&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java Wed Jan 4 14:48:02 2012
@@ -28,8 +28,11 @@ import java.util.concurrent.atomic.Atomi
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.Navigate;
+import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.Service;
+import org.apache.camel.model.OnExceptionDefinition;
+import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.RouteContext;
@@ -192,6 +195,8 @@ public class RouteService extends ChildS
LOG.debug("Stopping services on route: {}", route.getId());
// getServices will not add services again
List<Service> services = route.getServices();
+ // also get route scoped services
+ doGetRouteScopedServices(services, route);
// gather list of services to stop as we need to start child services as well
Set<Service> list = new LinkedHashSet<Service>();
@@ -228,12 +233,15 @@ public class RouteService extends ChildS
for (Route route : routes) {
LOG.debug("Shutting down services on route: {}", route.getId());
List<Service> services = route.getServices();
+ // also get route scoped child services
+ doGetRouteScopedServices(services, route);
// gather list of services to stop as we need to start child services as well
Set<Service> list = new LinkedHashSet<Service>();
for (Service service : services) {
doGetChildServices(list, service);
}
+
// shutdown services
stopChildService(route, list, true);
@@ -280,7 +288,6 @@ public class RouteService extends ChildS
@Override
protected void doResume() throws Exception {
-
// suspend and resume logic is provided by DefaultCamelContext which leverages ShutdownStrategy
// to safely suspend and resume
for (Route route : routes) {
@@ -319,7 +326,24 @@ public class RouteService extends ChildS
}
/**
- * Need to recursive start child services for routes
+ * Gather all route scoped services from the given route, such as route scoped error handler.
+ */
+ private void doGetRouteScopedServices(List<Service> services, Route route) {
+ for (ProcessorDefinition output : route.getRouteContext().getRoute().getOutputs()) {
+ if (output instanceof OnExceptionDefinition) {
+ OnExceptionDefinition onExceptionDefinition = (OnExceptionDefinition) output;
+ if (onExceptionDefinition.isRouteScoped()) {
+ Processor errorHandler = onExceptionDefinition.getErrorHandler(route.getId());
+ if (errorHandler != null && errorHandler instanceof Service) {
+ services.add((Service) errorHandler);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Gather all child services by navigating the service to recursivly gather all child services.
*/
private static void doGetChildServices(Set<Service> services, Service service) throws Exception {
services.add(service);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1227175&r1=1227174&r2=1227175&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed Jan 4 14:48:02 2012
@@ -942,8 +942,15 @@ public class MulticastProcessor extends
return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name);
}
+ @Override
protected void doStop() throws Exception {
ServiceHelper.stopServices(processors, errorHandlers);
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ ServiceHelper.stopAndShutdownServices(processors, errorHandlers);
+ // only clear error handlers when shutting down
errorHandlers.clear();
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=1227175&r1=1227174&r2=1227175&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Wed Jan 4 14:48:02 2012
@@ -1056,6 +1056,6 @@ public abstract class RedeliveryErrorHan
@Override
protected void doShutdown() throws Exception {
- ServiceHelper.stopServices(deadLetter, output, outputAsync);
+ ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync);
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java?rev=1227175&r1=1227174&r2=1227175&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java Wed Jan 4 14:48:02 2012
@@ -151,7 +151,7 @@ public final class ServiceHelper {
/**
* Stops and shutdowns all of the given services, throwing the first exception caught
*/
- public static void stopAndShutdownService(Service value) throws Exception {
+ public static void stopAndShutdownService(Object value) throws Exception {
stopService(value);
// then try to shutdown
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java?rev=1227175&r1=1227174&r2=1227175&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java Wed Jan 4 14:48:02 2012
@@ -16,6 +16,8 @@
*/
package org.apache.camel.management;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -184,4 +186,122 @@ public class ManagedRouteAddRemoveTest e
log.info("Shutting down...");
}
+ public void testRouteAddRemoteRouteWithRecipientListAndRouteScopedOnException() throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+ template.sendBody("direct:start", "Hello World");
+ result.assertIsSatisfied();
+
+ MBeanServer mbeanServer = getMBeanServer();
+ ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+
+ // number of producer caches
+ Set<ObjectName> names = mbeanServer.queryNames(on, null);
+ assertEquals(1, names.size());
+
+ log.info("Adding 2nd route");
+
+ // add a 2nd route
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:bar").routeId("bar")
+ .onException(Exception.class)
+ .handled(true)
+ .recipientList(header("error"))
+ .end().end()
+ .recipientList(header("bar")).throwException(new IllegalArgumentException("Forced"));
+ }
+ });
+
+ // and send a message to it
+ getMockEndpoint("mock:bar").expectedMessageCount(1);
+ getMockEndpoint("mock:error").expectedMessageCount(1);
+
+ Map headers = new HashMap();
+ headers.put("error", "mock:error");
+ headers.put("bar", "mock:bar");
+ template.sendBodyAndHeaders("direct:bar", "Hello World", headers);
+
+ assertMockEndpointsSatisfied();
+
+ // there should be two more producer cache
+ names = mbeanServer.queryNames(on, null);
+ assertEquals(3, names.size());
+
+ // now stop and remove the 2nd route
+ log.info("Stopping 2nd route");
+ context.stopRoute("bar");
+
+ log.info("Removing 2nd route");
+ boolean removed = context.removeRoute("bar");
+ assertTrue(removed);
+
+ // the producer cache should have been removed
+ on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+ names = mbeanServer.queryNames(on, null);
+ assertEquals(1, names.size());
+
+ log.info("Shutting down...");
+ }
+
+ public void testRouteAddRemoteRouteWithRecipientListAndContextScopedOnException() throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+ template.sendBody("direct:start", "Hello World");
+ result.assertIsSatisfied();
+
+ MBeanServer mbeanServer = getMBeanServer();
+ ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+
+ // number of producer caches
+ Set<ObjectName> names = mbeanServer.queryNames(on, null);
+ assertEquals(1, names.size());
+
+ log.info("Adding 2nd route");
+
+ // add a 2nd route
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ onException(Exception.class)
+ .handled(true)
+ .recipientList(header("error"))
+ .end();
+
+ from("direct:bar").routeId("bar")
+ .recipientList(header("bar")).throwException(new IllegalArgumentException("Forced"));
+ }
+ });
+
+ // and send a message to it
+ getMockEndpoint("mock:bar").expectedMessageCount(1);
+ getMockEndpoint("mock:error").expectedMessageCount(1);
+
+ Map headers = new HashMap();
+ headers.put("error", "mock:error");
+ headers.put("bar", "mock:bar");
+ template.sendBodyAndHeaders("direct:bar", "Hello World", headers);
+
+ assertMockEndpointsSatisfied();
+
+ // there should be two more producer cache
+ names = mbeanServer.queryNames(on, null);
+ assertEquals(3, names.size());
+
+ // now stop and remove the 2nd route
+ log.info("Stopping 2nd route");
+ context.stopRoute("bar");
+
+ log.info("Removing 2nd route");
+ boolean removed = context.removeRoute("bar");
+ assertTrue(removed);
+
+ // only the producer cache from the 2nd route should have been removed (the on exception becomes context scoped)
+ on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+ names = mbeanServer.queryNames(on, null);
+ assertEquals(2, names.size());
+
+ log.info("Shutting down...");
+ }
}
\ No newline at end of file