You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/01/04 15:48:03 UTC

svn commit: r1227175 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/util/ test/java/org/apache/camel/management/

Author: davsclaus
Date: Wed Jan  4 14:48:02 2012
New Revision: 1227175

URL: http://svn.apache.org/viewvc?rev=1227175&view=rev
Log:
CAMEL-4842: Removing route should remove producer cache from JMX, as well from services to close list on CamelContext, to not eat up memory. Also shutdown route scoped error handlers to not leak memory, when removing a route.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java?rev=1227175&r1=1227174&r2=1227175&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteService.java Wed Jan  4 14:48:02 2012
@@ -28,8 +28,11 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
 import org.apache.camel.Navigate;
+import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.Service;
+import org.apache.camel.model.OnExceptionDefinition;
+import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.RouteContext;
@@ -192,6 +195,8 @@ public class RouteService extends ChildS
             LOG.debug("Stopping services on route: {}", route.getId());
             // getServices will not add services again
             List<Service> services = route.getServices();
+            // also get route scoped services
+            doGetRouteScopedServices(services, route);
 
             // gather list of services to stop as we need to start child services as well
             Set<Service> list = new LinkedHashSet<Service>();
@@ -228,12 +233,15 @@ public class RouteService extends ChildS
         for (Route route : routes) {
             LOG.debug("Shutting down services on route: {}", route.getId());
             List<Service> services = route.getServices();
+            // also get route scoped child services
+            doGetRouteScopedServices(services, route);
 
             // gather list of services to stop as we need to start child services as well
             Set<Service> list = new LinkedHashSet<Service>();
             for (Service service : services) {
                 doGetChildServices(list, service);
             }
+
             // shutdown services
             stopChildService(route, list, true);
 
@@ -280,7 +288,6 @@ public class RouteService extends ChildS
 
     @Override
     protected void doResume() throws Exception {
-
         // suspend and resume logic is provided by DefaultCamelContext which leverages ShutdownStrategy
         // to safely suspend and resume
         for (Route route : routes) {
@@ -319,7 +326,24 @@ public class RouteService extends ChildS
     }
 
     /**
-     * Need to recursive start child services for routes
+     * Gather all route scoped services from the given route, such as route scoped error handler.
+     */
+    private void doGetRouteScopedServices(List<Service> services, Route route) {
+        for (ProcessorDefinition output : route.getRouteContext().getRoute().getOutputs()) {
+            if (output instanceof OnExceptionDefinition) {
+                OnExceptionDefinition onExceptionDefinition = (OnExceptionDefinition) output;
+                if (onExceptionDefinition.isRouteScoped()) {
+                    Processor errorHandler = onExceptionDefinition.getErrorHandler(route.getId());
+                    if (errorHandler != null && errorHandler instanceof Service) {
+                        services.add((Service) errorHandler);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Gather all child services by navigating the service to recursivly gather all child services.
      */
     private static void doGetChildServices(Set<Service> services, Service service) throws Exception {
         services.add(service);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1227175&r1=1227174&r2=1227175&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed Jan  4 14:48:02 2012
@@ -942,8 +942,15 @@ public class MulticastProcessor extends 
         return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name);
     }
 
+    @Override
     protected void doStop() throws Exception {
         ServiceHelper.stopServices(processors, errorHandlers);
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownServices(processors, errorHandlers);
+        // only clear error handlers when shutting down
         errorHandlers.clear();
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=1227175&r1=1227174&r2=1227175&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Wed Jan  4 14:48:02 2012
@@ -1056,6 +1056,6 @@ public abstract class RedeliveryErrorHan
 
     @Override
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopServices(deadLetter, output, outputAsync);
+        ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync);
     }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java?rev=1227175&r1=1227174&r2=1227175&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java Wed Jan  4 14:48:02 2012
@@ -151,7 +151,7 @@ public final class ServiceHelper {
     /**
      * Stops and shutdowns all of the given services, throwing the first exception caught
      */
-    public static void stopAndShutdownService(Service value) throws Exception {
+    public static void stopAndShutdownService(Object value) throws Exception {
         stopService(value);
 
         // then try to shutdown

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java?rev=1227175&r1=1227174&r2=1227175&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java Wed Jan  4 14:48:02 2012
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.management;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -184,4 +186,122 @@ public class ManagedRouteAddRemoveTest e
         log.info("Shutting down...");
     }
 
+    public void testRouteAddRemoteRouteWithRecipientListAndRouteScopedOnException() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+        template.sendBody("direct:start", "Hello World");
+        result.assertIsSatisfied();
+
+        MBeanServer mbeanServer = getMBeanServer();
+        ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+
+        // number of producer caches
+        Set<ObjectName> names = mbeanServer.queryNames(on, null);
+        assertEquals(1, names.size());
+
+        log.info("Adding 2nd route");
+
+        // add a 2nd route
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:bar").routeId("bar")
+                    .onException(Exception.class)
+                        .handled(true)
+                        .recipientList(header("error"))
+                    .end().end()
+                    .recipientList(header("bar")).throwException(new IllegalArgumentException("Forced"));
+            }
+        });
+
+        // and send a message to it
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+        getMockEndpoint("mock:error").expectedMessageCount(1);
+
+        Map headers = new HashMap();
+        headers.put("error", "mock:error");
+        headers.put("bar", "mock:bar");
+        template.sendBodyAndHeaders("direct:bar", "Hello World", headers);
+
+        assertMockEndpointsSatisfied();
+
+        // there should be two more producer cache
+        names = mbeanServer.queryNames(on, null);
+        assertEquals(3, names.size());
+
+        // now stop and remove the 2nd route
+        log.info("Stopping 2nd route");
+        context.stopRoute("bar");
+
+        log.info("Removing 2nd route");
+        boolean removed = context.removeRoute("bar");
+        assertTrue(removed);
+
+        // the producer cache should have been removed
+        on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+        names = mbeanServer.queryNames(on, null);
+        assertEquals(1, names.size());
+
+        log.info("Shutting down...");
+    }
+
+    public void testRouteAddRemoteRouteWithRecipientListAndContextScopedOnException() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+        template.sendBody("direct:start", "Hello World");
+        result.assertIsSatisfied();
+
+        MBeanServer mbeanServer = getMBeanServer();
+        ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+
+        // number of producer caches
+        Set<ObjectName> names = mbeanServer.queryNames(on, null);
+        assertEquals(1, names.size());
+
+        log.info("Adding 2nd route");
+
+        // add a 2nd route
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class)
+                    .handled(true)
+                    .recipientList(header("error"))
+                .end();
+
+                from("direct:bar").routeId("bar")
+                    .recipientList(header("bar")).throwException(new IllegalArgumentException("Forced"));
+            }
+        });
+
+        // and send a message to it
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+        getMockEndpoint("mock:error").expectedMessageCount(1);
+
+        Map headers = new HashMap();
+        headers.put("error", "mock:error");
+        headers.put("bar", "mock:bar");
+        template.sendBodyAndHeaders("direct:bar", "Hello World", headers);
+
+        assertMockEndpointsSatisfied();
+
+        // there should be two more producer cache
+        names = mbeanServer.queryNames(on, null);
+        assertEquals(3, names.size());
+
+        // now stop and remove the 2nd route
+        log.info("Stopping 2nd route");
+        context.stopRoute("bar");
+
+        log.info("Removing 2nd route");
+        boolean removed = context.removeRoute("bar");
+        assertTrue(removed);
+
+        // only the producer cache from the 2nd route should have been removed (the on exception becomes context scoped)
+        on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
+        names = mbeanServer.queryNames(on, null);
+        assertEquals(2, names.size());
+
+        log.info("Shutting down...");
+    }
 }
\ No newline at end of file