You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/01/05 12:12:06 UTC

svn commit: r1227554 - in /camel/branches/camel-2.8.x: ./ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/model/ camel-core/src/test/java/org/apache/camel/management/

Author: davsclaus
Date: Thu Jan  5 11:12:05 2012
New Revision: 1227554

URL: http://svn.apache.org/viewvc?rev=1227554&view=rev
Log:
CAMEL-4842: Route scoped onCompletion should also shutdown child services when removing routes, to not leak resources.

Modified:
    camel/branches/camel-2.8.x/   (props changed)
    camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
    camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
    camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Jan  5 11:12:05 2012
@@ -0,0 +1,2 @@
+/camel/branches/camel-2.9.x:1227549
+/camel/trunk:1227540

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=1227554&r1=1227553&r2=1227554&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/RouteService.java Thu Jan  5 11:12:05 2012
@@ -31,9 +31,11 @@ import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.Service;
+import org.apache.camel.model.OnCompletionDefinition;
 import org.apache.camel.model.OnExceptionDefinition;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.processor.OnCompletionProcessor;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.EventHelper;
@@ -303,6 +305,14 @@ public class RouteService extends Servic
                         services.add((Service) errorHandler);
                     }
                 }
+            } else if (output instanceof OnCompletionDefinition) {
+                OnCompletionDefinition onCompletionDefinition = (OnCompletionDefinition) output;
+                if (onCompletionDefinition.isRouteScoped()) {
+                    Processor onCompletionProcessor = onCompletionDefinition.getOnCompletion(route.getId());
+                    if (onCompletionProcessor != null && onCompletionProcessor instanceof Service) {
+                        services.add((Service) onCompletionProcessor);
+                    }
+                }
             }
         }
     }

Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java?rev=1227554&r1=1227553&r2=1227554&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java Thu Jan  5 11:12:05 2012
@@ -17,8 +17,11 @@
 package org.apache.camel.model;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -30,6 +33,7 @@ import javax.xml.bind.annotation.XmlTran
 
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
+import org.apache.camel.processor.FatalFallbackErrorHandler;
 import org.apache.camel.processor.OnCompletionProcessor;
 import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.spi.RouteContext;
@@ -57,10 +61,28 @@ public class OnCompletionDefinition exte
     private List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>();
     @XmlTransient
     private ExecutorService executorService;
+    @XmlTransient
+    private Boolean routeScoped;
+    // TODO: in Camel 3.0 the OnCompletionDefinition should not contain state and OnCompletion processors
+    @XmlTransient
+    private final Map<String, Processor> onCompletions = new HashMap<String, Processor>();
 
     public OnCompletionDefinition() {
     }
 
+    public boolean isRouteScoped() {
+        // is context scoped by default
+        return routeScoped != null ? routeScoped : false;
+    }
+
+    public Processor getOnCompletion(String routeId) {
+        return onCompletions.get(routeId);
+    }
+
+    public Collection<Processor> getOnCompletions() {
+        return onCompletions.values();
+    }
+
     @Override
     public String toString() {
         return "onCompletion[" + getOutputs() + "]";
@@ -83,15 +105,26 @@ public class OnCompletionDefinition exte
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
+        // assign whether this was a route scoped onCompletion or not
+        // we need to know this later when setting the parent, as only route scoped should have parent
+        // Note: this logic can possible be removed when the Camel routing engine decides at runtime
+        // to apply onCompletion in a more dynamic fashion than current code base
+        // and therefore is in a better position to decide among context/route scoped OnCompletion at runtime
+        if (routeScoped == null) {
+            routeScoped = super.getParent() != null;
+        }
+
         if (isOnCompleteOnly() && isOnFailureOnly()) {
             throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
         }
 
         Processor childProcessor = this.createChildProcessor(routeContext, true);
-
         // wrap the on completion route in a unit of work processor
         childProcessor = new UnitOfWorkProcessor(routeContext, childProcessor);
 
+        String id = routeContext.getRoute().getId();
+        onCompletions.put(id, childProcessor);
+
         Predicate when = null;
         if (onWhen != null) {
             when = onWhen.getExpression().createPredicate(routeContext);

Modified: camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java?rev=1227554&r1=1227553&r2=1227554&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java Thu Jan  5 11:12:05 2012
@@ -304,4 +304,121 @@ public class ManagedRouteAddRemoveTest e
 
         log.info("Shutting down...");
     }
-}
\ No newline at end of file
+
+    public void testRouteAddRemoteRouteWithRecipientListAndRouteScopedOnCompletion() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+        template.sendBody("direct:start", "Hello World");
+        result.assertIsSatisfied();
+
+        MBeanServer mbeanServer = getMBeanServer();
+        ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+
+        // number of producer caches
+        Set<ObjectName> names = mbeanServer.queryNames(on, null);
+        assertEquals(1, names.size());
+
+        log.info("Adding 2nd route");
+
+        // add a 2nd route
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:bar").routeId("bar")
+                    .onCompletion()
+                        .recipientList(header("done"))
+                    .end().end()
+                    .recipientList(header("bar"));
+            }
+        });
+
+        // and send a message to it
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+        getMockEndpoint("mock:done").expectedMessageCount(1);
+
+        Map headers = new HashMap();
+        headers.put("done", "mock:done");
+        headers.put("bar", "mock:bar");
+        template.sendBodyAndHeaders("direct:bar", "Hello World", headers);
+
+        assertMockEndpointsSatisfied();
+
+        // there should be two more producer cache
+        names = mbeanServer.queryNames(on, null);
+        assertEquals(3, names.size());
+
+        // now stop and remove the 2nd route
+        log.info("Stopping 2nd route");
+        context.stopRoute("bar");
+
+        log.info("Removing 2nd route");
+        boolean removed = context.removeRoute("bar");
+        assertTrue(removed);
+
+        // the producer cache should have been removed
+        on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+        names = mbeanServer.queryNames(on, null);
+        assertEquals(1, names.size());
+
+        log.info("Shutting down...");
+    }
+
+    public void testRouteAddRemoteRouteWithRecipientListAndContextScopedOnCompletion() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+        template.sendBody("direct:start", "Hello World");
+        result.assertIsSatisfied();
+
+        MBeanServer mbeanServer = getMBeanServer();
+        ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+
+        // number of producer caches
+        Set<ObjectName> names = mbeanServer.queryNames(on, null);
+        assertEquals(1, names.size());
+
+        log.info("Adding 2nd route");
+
+        // add a 2nd route
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onCompletion()
+                    .recipientList(header("done"))
+                    .end();
+
+                from("direct:bar").routeId("bar")
+                    .recipientList(header("bar"));
+            }
+        });
+
+        // and send a message to it
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+        getMockEndpoint("mock:done").expectedMessageCount(1);
+
+        Map headers = new HashMap();
+        headers.put("done", "mock:done");
+        headers.put("bar", "mock:bar");
+        template.sendBodyAndHeaders("direct:bar", "Hello World", headers);
+
+        assertMockEndpointsSatisfied();
+
+        // there should be two more producer cache
+        names = mbeanServer.queryNames(on, null);
+        assertEquals(3, names.size());
+
+        // now stop and remove the 2nd route
+        log.info("Stopping 2nd route");
+        context.stopRoute("bar");
+
+        log.info("Removing 2nd route");
+        boolean removed = context.removeRoute("bar");
+        assertTrue(removed);
+
+        // only the producer cache from the 2nd route should have been removed (the on completion is context scoped)
+        on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+        names = mbeanServer.queryNames(on, null);
+        assertEquals(2, names.size());
+
+        log.info("Shutting down...");
+    }
+}