You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Claus Ibsen (JIRA)" <ji...@apache.org> on 2015/09/17 10:12:45 UTC

[jira] [Assigned] (CAMEL-9143) Producers that implement the ServicePoolAware interface cause memory leak due to JMX references

     [ https://issues.apache.org/jira/browse/CAMEL-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Claus Ibsen reassigned CAMEL-9143:
----------------------------------

    Assignee: Claus Ibsen

> Producers that implement the ServicePoolAware interface cause memory leak due to JMX references
> -----------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-9143
>                 URL: https://issues.apache.org/jira/browse/CAMEL-9143
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.14.1, 2.14.2, 2.15.0, 2.15.1
>            Reporter: Bob Browning
>            Assignee: Claus Ibsen
>             Fix For: 2.16.0, 2.15.4
>
>
> h4. Description
> Producer instances that implement the ServicePoolAware interface will leak memory if their route is stopped, with new producers being leaked every time the route is started/stopped.
> Known implementations that are affected are RemoteFileProducer (ftp, sftp) and Mina2Producer.
> This is due to the behaviour that the SendProcessor which when the route is stopped it shuts down it's `producerCache` instance.
> {code}
>     protected void doStop() throws Exception {
>         ServiceHelper.stopServices(producerCache, producer);
>     }
> {code}
> this in turn calls `stopAndShutdownService(pool)` which will call stop on the SharedProducerServicePool instance which is a NOOP however it also calls shutdown which effects a stop of the global pool (this stops all the registered services and then clears the pool.
> {code}
>     protected void doStop() throws Exception {
>         // when stopping we intend to shutdown
>         ServiceHelper.stopAndShutdownService(pool);
>         try {
>             ServiceHelper.stopAndShutdownServices(producers.values());
>         } finally {
>             // ensure producers are removed, and also from JMX
>             for (Producer producer : producers.values()) {
>                 getCamelContext().removeService(producer);
>             }
>         }
>         producers.clear();
>     }
> {code}
> However no call to `context.removeService(Producer) is called for the entries from the pool only those singleton instances that were in the `producers` map hence the JMX `ManagedProducer` that is created when `doGetProducer` invokes {code}                getCamelContext().addService(answer, false);
> {code} is never removed. 
> Since the global pool is empty when the next request to get a producer is called a new producer is created, jmx wrapper and all, whilst the old instance remains orphaned retaining any objects that pertain to that instance.
> One workaround is for the producer to call {code}getEndpoint().getCamelContext().removeService(this){code} in it's stop method, however this is fairly obscure and it would probably be better to invoke removal of the producer when it is removed from the shared pool.
> Another issue of note is that when a route is shutdown that contains a SendProcessor due to the shutdown invocation on the SharedProcessorServicePool the global pool is cleared of `everything` and remains in `Stopped` state until another route starts it (although it is still accessed and used whilst in the `Stopped` state).
> h4. Impact
> For general use where there is no dynamic creation or passivation of routes this issue should be minimal, however in our use case where the routes are not static, there is a certain amount of recreation of routes as customer endpoints change and there is a need to passivate idle routes this causes a considerable memory leak (via SFTP in particular).
> h4. Test Case
> {code}
> package org.apache.camel.component;
> import com.google.common.util.concurrent.AtomicLongMap;
> import org.apache.camel.CamelContext;
> import org.apache.camel.Consumer;
> import org.apache.camel.Endpoint;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.Producer;
> import org.apache.camel.Route;
> import org.apache.camel.Service;
> import org.apache.camel.ServicePoolAware;
> import org.apache.camel.ServiceStatus;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultComponent;
> import org.apache.camel.impl.DefaultEndpoint;
> import org.apache.camel.impl.DefaultProducer;
> import org.apache.camel.support.LifecycleStrategySupport;
> import org.apache.camel.support.ServiceSupport;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.junit.Test;
> import java.util.Map;
> import static com.google.common.base.Preconditions.checkNotNull;
> /**
>  * Test memory behaviour of producers using {@link ServicePoolAware} when using JMX.
>  */
> public class ServicePoolAwareLeakyTest extends CamelTestSupport {
>   private static final String LEAKY_SIEVE_STABLE = "leaky://sieve-stable?plugged=true";
>   private static final String LEAKY_SIEVE_TRANSIENT = "leaky://sieve-transient?plugged=true";
>   private static boolean isPatchApplied() {
>     return Boolean.parseBoolean(System.getProperty("patchApplied", "false"));
>   }
>   /**
>    * Component that provides leaks producers.
>    */
>   private static class LeakySieveComponent extends DefaultComponent {
>     @Override
>     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
>       boolean plugged = "true".equalsIgnoreCase((String) parameters.remove("plugged"));
>       return new LeakySieveEndpoint(uri, isPatchApplied() && plugged);
>     }
>   }
>   /**
>    * Endpoint that provides leaky producers.
>    */
>   private static class LeakySieveEndpoint extends DefaultEndpoint {
>     private final String uri;
>     private final boolean plugged;
>     public LeakySieveEndpoint(String uri, boolean plugged) {
>       this.uri = checkNotNull(uri, "uri must not be null");
>       this.plugged = plugged;
>     }
>     @Override
>     public Producer createProducer() throws Exception {
>       return new LeakySieveProducer(this, plugged);
>     }
>     @Override
>     public Consumer createConsumer(Processor processor) throws Exception {
>       throw new UnsupportedOperationException();
>     }
>     @Override
>     public boolean isSingleton() {
>       return true;
>     }
>     @Override
>     protected String createEndpointUri() {
>       return uri;
>     }
>   }
>   /**
>    * Leaky producer - implements {@link ServicePoolAware}.
>    */
>   private static class LeakySieveProducer extends DefaultProducer implements ServicePoolAware {
>     private final boolean plugged;
>     public LeakySieveProducer(Endpoint endpoint, boolean plugged) {
>       super(endpoint);
>       this.plugged = plugged;
>     }
>     @Override
>     public void process(Exchange exchange) throws Exception {
>       // do nothing
>     }
>     @Override
>     protected void doStop() throws Exception {
>       super.doStop();
>       //noinspection ConstantConditions
>       if (plugged) {
>         // need to remove self from services since we are ServicePoolAware this will not be handled for us otherwise we
>         // leak memory
>         getEndpoint().getCamelContext().removeService(this);
>       }
>     }
>   }
>   @Override
>   protected boolean useJmx() {
>     // only occurs when using JMX as the GC root for the producer is through a ManagedProducer created by the
>     // context.addService() invocation
>     return true;
>   }
>   /**
>    * Returns true if verification of state should be performed during the test as opposed to at the end.
>    */
>   public boolean isFailFast() {
>     return false;
>   }
>   /**
>    * Returns true if during fast failure we should verify that the service pool remains in the started state.
>    */
>   public boolean isVerifyProducerServicePoolRemainsStarted() {
>     return false;
>   }
>   @Override
>   public boolean isUseAdviceWith() {
>     return true;
>   }
>   @Test
>   public void testForMemoryLeak() throws Exception {
>     registerLeakyComponent();
>     final AtomicLongMap<String> references = AtomicLongMap.create();
>     // track LeakySieveProducer lifecycle
>     context.addLifecycleStrategy(new LifecycleStrategySupport() {
>       @Override
>       public void onServiceAdd(CamelContext context, Service service, Route route) {
>         if (service instanceof LeakySieveProducer) {
>           references.incrementAndGet(((LeakySieveProducer) service).getEndpoint().getEndpointKey());
>         }
>       }
>       @Override
>       public void onServiceRemove(CamelContext context, Service service, Route route) {
>         if (service instanceof LeakySieveProducer) {
>           references.decrementAndGet(((LeakySieveProducer) service).getEndpoint().getEndpointKey());
>         }
>       }
>     });
>     context.addRoutes(new RouteBuilder() {
>       @Override
>       public void configure() throws Exception {
>         from("direct:sieve-transient")
>             .id("sieve-transient")
>             .to(LEAKY_SIEVE_TRANSIENT);
>         from("direct:sieve-stable")
>             .id("sieve-stable")
>             .to(LEAKY_SIEVE_STABLE);
>       }
>     });
>     context.start();
>     for (int i = 0; i < 1000; i++) {
>       ServiceSupport service = (ServiceSupport) context.getProducerServicePool();
>       assertEquals(ServiceStatus.Started, service.getStatus());
>       if (isFailFast()) {
>         assertEquals(2, context.getProducerServicePool().size());
>         assertEquals(1, references.get(LEAKY_SIEVE_TRANSIENT));
>         assertEquals(1, references.get(LEAKY_SIEVE_STABLE));
>       }
>       context.stopRoute("sieve-transient");
>       if (isFailFast()) {
>         assertEquals("Expected no service references to remain", 0, references.get(LEAKY_SIEVE_TRANSIENT));
>       }
>       if (isFailFast()) {
>         // looks like we cleared more than just our route, we've stopped and cleared the global ProducerServicePool
>         // since SendProcessor.stop() invokes ServiceHelper.stopServices(producerCache, producer); which in turn invokes
>         // ServiceHelper.stopAndShutdownService(pool);.
>         //
>         // Whilst stop on the SharedProducerServicePool is a NOOP shutdown is not and effects a stop of the pool.
>         if (isVerifyProducerServicePoolRemainsStarted()) {
>          assertEquals(ServiceStatus.Started, service.getStatus());
>         }
>         assertEquals("Expected one stable producer to remain pooled", 1, context.getProducerServicePool().size());
>         assertEquals("Expected one stable producer to remain as service", 1, references.get(LEAKY_SIEVE_STABLE));
>       }
>       // Send a body to verify behaviour of send producer after another route has been stopped
>       sendBody("direct:sieve-stable", "");
>       if (isFailFast()) {
>         // shared pool is used despite being 'Stopped'
>         if (isVerifyProducerServicePoolRemainsStarted()) {
>           assertEquals(ServiceStatus.Started, service.getStatus());
>         }
>         assertEquals("Expected only stable producer in pool", 1, context.getProducerServicePool().size());
>         assertEquals("Expected no references to transient producer", 0, references.get(LEAKY_SIEVE_TRANSIENT));
>         assertEquals("Expected reference to stable producer", 1, references.get(LEAKY_SIEVE_STABLE));
>       }
>       context.startRoute("sieve-transient");
>       // ok, back to normal
>       assertEquals(ServiceStatus.Started, service.getStatus());
>       if (isFailFast()) {
>         assertEquals("Expected both producers in pool", 2, context.getProducerServicePool().size());
>         assertEquals("Expected one transient producer as service", 1, references.get(LEAKY_SIEVE_TRANSIENT));
>         assertEquals("Expected one stable producer as service", 1, references.get(LEAKY_SIEVE_STABLE));
>       }
>     }
>     if (!isFailFast()) {
>       assertEquals("Expected both producers in pool", 2, context.getProducerServicePool().size());
>       // if not fixed these will equal the number of iterations in the loop + 1
>       assertEquals("Expected one transient producer as service", 1, references.get(LEAKY_SIEVE_TRANSIENT));
>       assertEquals("Expected one stable producer as service", 1, references.get(LEAKY_SIEVE_STABLE));
>     }
>   }
>   private void registerLeakyComponent() {
>     // register leaky component
>     context.addComponent("leaky", new LeakySieveComponent());
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)