You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/01/05 11:44:56 UTC
svn commit: r1227549 - in /camel/branches/camel-2.9.x: ./
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/model/
camel-core/src/test/java/org/apache/camel/management/
Author: davsclaus
Date: Thu Jan 5 10:44:56 2012
New Revision: 1227549
URL: http://svn.apache.org/viewvc?rev=1227549&view=rev
Log:
CAMEL-4842: Route scoped onCompletion should also shutdown child services when removing routes, to not leak resources.
Modified:
camel/branches/camel-2.9.x/ (props changed)
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 5 10:44:56 2012
@@ -1 +1 @@
-/camel/trunk:1227209,1227212
+/camel/trunk:1227209,1227212,1227540
Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Thu Jan 5 10:44:56 2012
@@ -1 +1 @@
-/camel/trunk:1-1227196,1227209,1227212
+/camel/trunk:1-1227196,1227209,1227212,1227540
Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=1227549&r1=1227548&r2=1227549&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java (original)
+++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java Thu Jan 5 10:44:56 2012
@@ -31,9 +31,11 @@ import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.Service;
+import org.apache.camel.model.OnCompletionDefinition;
import org.apache.camel.model.OnExceptionDefinition;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.processor.OnCompletionProcessor;
import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.RoutePolicy;
@@ -338,12 +340,20 @@ public class RouteService extends ChildS
services.add((Service) errorHandler);
}
}
+ } else if (output instanceof OnCompletionDefinition) {
+ OnCompletionDefinition onCompletionDefinition = (OnCompletionDefinition) output;
+ if (onCompletionDefinition.isRouteScoped()) {
+ Processor onCompletionProcessor = onCompletionDefinition.getOnCompletion(route.getId());
+ if (onCompletionProcessor != null && onCompletionProcessor instanceof Service) {
+ services.add((Service) onCompletionProcessor);
+ }
+ }
}
}
}
/**
- * Gather all child services by navigating the service to recursivly gather all child services.
+ * Gather all child services by navigating the service to recursively gather all child services.
*/
private static void doGetChildServices(Set<Service> services, Service service) throws Exception {
services.add(service);
Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java?rev=1227549&r1=1227548&r2=1227549&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java (original)
+++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java Thu Jan 5 10:44:56 2012
@@ -17,8 +17,11 @@
package org.apache.camel.model;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import javax.xml.bind.annotation.XmlAccessType;
@@ -31,6 +34,7 @@ import javax.xml.bind.annotation.XmlTran
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
+import org.apache.camel.processor.FatalFallbackErrorHandler;
import org.apache.camel.processor.OnCompletionProcessor;
import org.apache.camel.processor.UnitOfWorkProcessor;
import org.apache.camel.spi.ExecutorServiceManager;
@@ -58,10 +62,28 @@ public class OnCompletionDefinition exte
private List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>();
@XmlTransient
private ExecutorService executorService;
+ @XmlTransient
+ private Boolean routeScoped;
+ // TODO: in Camel 3.0 the OnCompletionDefinition should not contain state and OnCompletion processors
+ @XmlTransient
+ private final Map<String, Processor> onCompletions = new HashMap<String, Processor>();
public OnCompletionDefinition() {
}
+ public boolean isRouteScoped() {
+ // is context scoped by default
+ return routeScoped != null ? routeScoped : false;
+ }
+
+ public Processor getOnCompletion(String routeId) {
+ return onCompletions.get(routeId);
+ }
+
+ public Collection<Processor> getOnCompletions() {
+ return onCompletions.values();
+ }
+
@Override
public String toString() {
return "onCompletion[" + getOutputs() + "]";
@@ -84,15 +106,26 @@ public class OnCompletionDefinition exte
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
+ // assign whether this was a route scoped onCompletion or not
+ // we need to know this later when setting the parent, as only route scoped should have parent
+ // Note: this logic can possible be removed when the Camel routing engine decides at runtime
+ // to apply onCompletion in a more dynamic fashion than current code base
+ // and therefore is in a better position to decide among context/route scoped OnCompletion at runtime
+ if (routeScoped == null) {
+ routeScoped = super.getParent() != null;
+ }
+
if (isOnCompleteOnly() && isOnFailureOnly()) {
throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
}
Processor childProcessor = this.createChildProcessor(routeContext, true);
-
// wrap the on completion route in a unit of work processor
childProcessor = new UnitOfWorkProcessor(routeContext, childProcessor);
+ String id = routeContext.getRoute().getId();
+ onCompletions.put(id, childProcessor);
+
Predicate when = null;
if (onWhen != null) {
when = onWhen.getExpression().createPredicate(routeContext);
Modified: camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java?rev=1227549&r1=1227548&r2=1227549&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java (original)
+++ camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java Thu Jan 5 10:44:56 2012
@@ -304,4 +304,121 @@ public class ManagedRouteAddRemoveTest e
log.info("Shutting down...");
}
-}
\ No newline at end of file
+
+ public void testRouteAddRemoteRouteWithRecipientListAndRouteScopedOnCompletion() throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+ template.sendBody("direct:start", "Hello World");
+ result.assertIsSatisfied();
+
+ MBeanServer mbeanServer = getMBeanServer();
+ ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+
+ // number of producer caches
+ Set<ObjectName> names = mbeanServer.queryNames(on, null);
+ assertEquals(1, names.size());
+
+ log.info("Adding 2nd route");
+
+ // add a 2nd route
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:bar").routeId("bar")
+ .onCompletion()
+ .recipientList(header("done"))
+ .end().end()
+ .recipientList(header("bar"));
+ }
+ });
+
+ // and send a message to it
+ getMockEndpoint("mock:bar").expectedMessageCount(1);
+ getMockEndpoint("mock:done").expectedMessageCount(1);
+
+ Map headers = new HashMap();
+ headers.put("done", "mock:done");
+ headers.put("bar", "mock:bar");
+ template.sendBodyAndHeaders("direct:bar", "Hello World", headers);
+
+ assertMockEndpointsSatisfied();
+
+ // there should be two more producer cache
+ names = mbeanServer.queryNames(on, null);
+ assertEquals(3, names.size());
+
+ // now stop and remove the 2nd route
+ log.info("Stopping 2nd route");
+ context.stopRoute("bar");
+
+ log.info("Removing 2nd route");
+ boolean removed = context.removeRoute("bar");
+ assertTrue(removed);
+
+ // the producer cache should have been removed
+ on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+ names = mbeanServer.queryNames(on, null);
+ assertEquals(1, names.size());
+
+ log.info("Shutting down...");
+ }
+
+ public void testRouteAddRemoteRouteWithRecipientListAndContextScopedOnCompletion() throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+ template.sendBody("direct:start", "Hello World");
+ result.assertIsSatisfied();
+
+ MBeanServer mbeanServer = getMBeanServer();
+ ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+
+ // number of producer caches
+ Set<ObjectName> names = mbeanServer.queryNames(on, null);
+ assertEquals(1, names.size());
+
+ log.info("Adding 2nd route");
+
+ // add a 2nd route
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ onCompletion()
+ .recipientList(header("done"))
+ .end();
+
+ from("direct:bar").routeId("bar")
+ .recipientList(header("bar"));
+ }
+ });
+
+ // and send a message to it
+ getMockEndpoint("mock:bar").expectedMessageCount(1);
+ getMockEndpoint("mock:done").expectedMessageCount(1);
+
+ Map headers = new HashMap();
+ headers.put("done", "mock:done");
+ headers.put("bar", "mock:bar");
+ template.sendBodyAndHeaders("direct:bar", "Hello World", headers);
+
+ assertMockEndpointsSatisfied();
+
+ // there should be two more producer cache
+ names = mbeanServer.queryNames(on, null);
+ assertEquals(3, names.size());
+
+ // now stop and remove the 2nd route
+ log.info("Stopping 2nd route");
+ context.stopRoute("bar");
+
+ log.info("Removing 2nd route");
+ boolean removed = context.removeRoute("bar");
+ assertTrue(removed);
+
+ // only the producer cache from the 2nd route should have been removed (the on completion is context scoped)
+ on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+ names = mbeanServer.queryNames(on, null);
+ assertEquals(2, names.size());
+
+ log.info("Shutting down...");
+ }
+}