You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/09/17 11:41:27 UTC

[1/3] camel git commit: CAMEL-9143: Fixed JMX leak when using ServicePoolAware producers. Thanks to Bob Browning for the test case.

Repository: camel
Updated Branches:
  refs/heads/camel-2.14.x 843ef83f8 -> 3041cc643
  refs/heads/camel-2.15.x 185b61303 -> e1105553d
  refs/heads/master 15b81c4d4 -> 08077733a


CAMEL-9143: Fixed JMX leak when using ServicePoolAware producers. Thanks to Bob Browning for the test case.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/08077733
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/08077733
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/08077733

Branch: refs/heads/master
Commit: 08077733ad8ac74c7d1d73fa7052a89f92d6d21a
Parents: 15b81c4
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Sep 17 11:18:01 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Sep 17 11:18:01 2015 +0200

----------------------------------------------------------------------
 .../org/apache/camel/impl/ProducerCache.java    |  19 +-
 .../camel/issues/ServicePoolAwareLeakyTest.java | 242 +++++++++++++++++++
 2 files changed, 257 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/08077733/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
index 5b79954..586cc69 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
@@ -60,23 +60,31 @@ public class ProducerCache extends ServiceSupport {
     private boolean eventNotifierEnabled = true;
     private boolean extendedStatistics;
     private int maxCacheSize;
+    private boolean stopServicePool;
 
     public ProducerCache(Object source, CamelContext camelContext) {
         this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext));
     }
 
     public ProducerCache(Object source, CamelContext camelContext, int cacheSize) {
-        this(source, camelContext, camelContext.getProducerServicePool(), createLRUCache(cacheSize));
+        this(source, camelContext, null, createLRUCache(cacheSize));
     }
 
     public ProducerCache(Object source, CamelContext camelContext, Map<String, Producer> cache) {
-        this(source, camelContext, camelContext.getProducerServicePool(), cache);
+        this(source, camelContext, null, cache);
     }
 
     public ProducerCache(Object source, CamelContext camelContext, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) {
         this.source = source;
         this.camelContext = camelContext;
-        this.pool = producerServicePool;
+        if (producerServicePool == null) {
+            // use shared producer pool which lifecycle is managed by CamelContext
+            this.pool = camelContext.getProducerServicePool();
+            this.stopServicePool = false;
+        } else {
+            this.pool = producerServicePool;
+            this.stopServicePool = true;
+        }
         this.producers = cache;
         if (producers instanceof LRUCache) {
             maxCacheSize = ((LRUCache) producers).getMaxCacheSize();
@@ -468,7 +476,10 @@ public class ProducerCache extends ServiceSupport {
 
     protected void doStop() throws Exception {
         // when stopping we intend to shutdown
-        ServiceHelper.stopAndShutdownServices(statistics, pool);
+        ServiceHelper.stopAndShutdownService(statistics);
+        if (stopServicePool) {
+            ServiceHelper.stopAndShutdownService(pool);
+        }
         try {
             ServiceHelper.stopAndShutdownServices(producers.values());
         } finally {

http://git-wip-us.apache.org/repos/asf/camel/blob/08077733/camel-core/src/test/java/org/apache/camel/issues/ServicePoolAwareLeakyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/ServicePoolAwareLeakyTest.java b/camel-core/src/test/java/org/apache/camel/issues/ServicePoolAwareLeakyTest.java
new file mode 100644
index 0000000..9b3d947
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/ServicePoolAwareLeakyTest.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Route;
+import org.apache.camel.Service;
+import org.apache.camel.ServicePoolAware;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.support.LifecycleStrategySupport;
+import org.apache.camel.support.ServiceSupport;
+
+public class ServicePoolAwareLeakyTest extends ContextTestSupport {
+
+    private static final String LEAKY_SIEVE_STABLE = "leaky://sieve-stable";
+    private static final String LEAKY_SIEVE_TRANSIENT = "leaky://sieve-transient";
+
+    /**
+     * Component that provides leaks producers.
+     */
+    private static class LeakySieveComponent extends DefaultComponent {
+        @Override
+        protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+            return new LeakySieveEndpoint(uri);
+        }
+    }
+
+    /**
+     * Endpoint that provides leaky producers.
+     */
+    private static class LeakySieveEndpoint extends DefaultEndpoint {
+
+        private final String uri;
+
+        public LeakySieveEndpoint(String uri) {
+            this.uri = uri;
+        }
+
+        @Override
+        public Producer createProducer() throws Exception {
+            return new LeakySieveProducer(this);
+        }
+
+        @Override
+        public Consumer createConsumer(Processor processor) throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isSingleton() {
+            return true;
+        }
+
+        @Override
+        protected String createEndpointUri() {
+            return uri;
+        }
+    }
+
+    /**
+     * Leaky producer - implements {@link ServicePoolAware}.
+     */
+    private static class LeakySieveProducer extends DefaultProducer implements ServicePoolAware {
+
+        public LeakySieveProducer(Endpoint endpoint) {
+            super(endpoint);
+        }
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            // do nothing
+        }
+
+    }
+
+    @Override
+    protected boolean useJmx() {
+        // only occurs when using JMX as the GC root for the producer is through a ManagedProducer created by the
+        // context.addService() invocation
+        return true;
+    }
+
+    /**
+     * Returns true if verification of state should be performed during the test as opposed to at the end.
+     */
+    public boolean isFailFast() {
+        return false;
+    }
+
+    /**
+     * Returns true if during fast failure we should verify that the service pool remains in the started state.
+     */
+    public boolean isVerifyProducerServicePoolRemainsStarted() {
+        return false;
+    }
+
+    public void testForMemoryLeak() throws Exception {
+        registerLeakyComponent();
+
+        final Map<String, AtomicLong> references = new HashMap<>();
+
+        // track LeakySieveProducer lifecycle
+        context.addLifecycleStrategy(new LifecycleStrategySupport() {
+            @Override
+            public void onServiceAdd(CamelContext context, Service service, Route route) {
+                if (service instanceof LeakySieveProducer) {
+                    String key = ((LeakySieveProducer) service).getEndpoint().getEndpointKey();
+                    AtomicLong num = references.get(key);
+                    if (num == null) {
+                        num = new AtomicLong();
+                        references.put(key, num);
+                    }
+                    num.incrementAndGet();
+                }
+            }
+
+            @Override
+            public void onServiceRemove(CamelContext context, Service service, Route route) {
+                if (service instanceof LeakySieveProducer) {
+                    String key = ((LeakySieveProducer) service).getEndpoint().getEndpointKey();
+                    AtomicLong num = references.get(key);
+                    if (num == null) {
+                        num = new AtomicLong();
+                        references.put(key, num);
+                    }
+                    num.decrementAndGet();
+                }
+            }
+        });
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:sieve-transient")
+                        .id("sieve-transient")
+                        .to(LEAKY_SIEVE_TRANSIENT);
+
+                from("direct:sieve-stable")
+                        .id("sieve-stable")
+                        .to(LEAKY_SIEVE_STABLE);
+            }
+        });
+
+        context.start();
+
+        for (int i = 0; i < 1000; i++) {
+            ServiceSupport service = (ServiceSupport) context.getProducerServicePool();
+            assertEquals(ServiceStatus.Started, service.getStatus());
+            if (isFailFast()) {
+                assertEquals(2, context.getProducerServicePool().size());
+                assertEquals(1, references.get(LEAKY_SIEVE_TRANSIENT).get());
+                assertEquals(1, references.get(LEAKY_SIEVE_STABLE).get());
+            }
+
+            context.stopRoute("sieve-transient");
+
+            if (isFailFast()) {
+                assertEquals("Expected no service references to remain", 0, references.get(LEAKY_SIEVE_TRANSIENT));
+            }
+
+            if (isFailFast()) {
+                // looks like we cleared more than just our route, we've stopped and cleared the global ProducerServicePool
+                // since SendProcessor.stop() invokes ServiceHelper.stopServices(producerCache, producer); which in turn invokes
+                // ServiceHelper.stopAndShutdownService(pool);.
+                //
+                // Whilst stop on the SharedProducerServicePool is a NOOP shutdown is not and effects a stop of the pool.
+
+                if (isVerifyProducerServicePoolRemainsStarted()) {
+                    assertEquals(ServiceStatus.Started, service.getStatus());
+                }
+                assertEquals("Expected one stable producer to remain pooled", 1, context.getProducerServicePool().size());
+                assertEquals("Expected one stable producer to remain as service", 1, references.get(LEAKY_SIEVE_STABLE).get());
+            }
+
+            // Send a body to verify behaviour of send producer after another route has been stopped
+            sendBody("direct:sieve-stable", "");
+
+            if (isFailFast()) {
+                // shared pool is used despite being 'Stopped'
+                if (isVerifyProducerServicePoolRemainsStarted()) {
+                    assertEquals(ServiceStatus.Started, service.getStatus());
+                }
+
+                assertEquals("Expected only stable producer in pool", 1, context.getProducerServicePool().size());
+                assertEquals("Expected no references to transient producer", 0, references.get(LEAKY_SIEVE_TRANSIENT).get());
+                assertEquals("Expected reference to stable producer", 1, references.get(LEAKY_SIEVE_STABLE).get());
+            }
+
+            context.startRoute("sieve-transient");
+
+            // ok, back to normal
+            assertEquals(ServiceStatus.Started, service.getStatus());
+            if (isFailFast()) {
+                assertEquals("Expected both producers in pool", 2, context.getProducerServicePool().size());
+                assertEquals("Expected one transient producer as service", 1, references.get(LEAKY_SIEVE_TRANSIENT).get());
+                assertEquals("Expected one stable producer as service", 1, references.get(LEAKY_SIEVE_STABLE).get());
+            }
+        }
+
+        if (!isFailFast()) {
+            assertEquals("Expected both producers in pool", 2, context.getProducerServicePool().size());
+
+            // if not fixed these will equal the number of iterations in the loop + 1
+            assertEquals("Expected one transient producer as service", 1, references.get(LEAKY_SIEVE_TRANSIENT).get());
+            assertEquals("Expected one stable producer as service", 1, references.get(LEAKY_SIEVE_STABLE).get());
+        }
+    }
+
+    private void registerLeakyComponent() {
+        // register leaky component
+        context.addComponent("leaky", new LeakySieveComponent());
+    }
+
+}


[3/3] camel git commit: CAMEL-9143: Fixed JMX leak when using ServicePoolAware producers. Thanks to Bob Browning for the test case.

Posted by da...@apache.org.
CAMEL-9143: Fixed JMX leak when using ServicePoolAware producers. Thanks to Bob Browning for the test case.

Conflicts:
	camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3041cc64
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3041cc64
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3041cc64

Branch: refs/heads/camel-2.14.x
Commit: 3041cc643f456bbcdfed294071e0fd0099d6da8e
Parents: 843ef83
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Sep 17 11:18:01 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Sep 17 11:42:39 2015 +0200

----------------------------------------------------------------------
 .../org/apache/camel/impl/ProducerCache.java    |  18 +-
 .../camel/issues/ServicePoolAwareLeakyTest.java | 242 +++++++++++++++++++
 2 files changed, 256 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3041cc64/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
index 4ece29f..a770049 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
@@ -55,23 +55,31 @@ public class ProducerCache extends ServiceSupport {
     private final Map<String, Producer> producers;
     private final Object source;
     private boolean eventNotifierEnabled = true;
+    private boolean stopServicePool;
 
     public ProducerCache(Object source, CamelContext camelContext) {
         this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext));
     }
 
     public ProducerCache(Object source, CamelContext camelContext, int cacheSize) {
-        this(source, camelContext, camelContext.getProducerServicePool(), createLRUCache(cacheSize));
+        this(source, camelContext, null, createLRUCache(cacheSize));
     }
 
     public ProducerCache(Object source, CamelContext camelContext, Map<String, Producer> cache) {
-        this(source, camelContext, camelContext.getProducerServicePool(), cache);
+        this(source, camelContext, null, cache);
     }
 
     public ProducerCache(Object source, CamelContext camelContext, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) {
         this.source = source;
         this.camelContext = camelContext;
-        this.pool = producerServicePool;
+        if (producerServicePool == null) {
+            // use shared producer pool which lifecycle is managed by CamelContext
+            this.pool = camelContext.getProducerServicePool();
+            this.stopServicePool = false;
+        } else {
+            this.pool = producerServicePool;
+            this.stopServicePool = true;
+        }
         this.producers = cache;
     }
 
@@ -427,7 +435,9 @@ public class ProducerCache extends ServiceSupport {
 
     protected void doStop() throws Exception {
         // when stopping we intend to shutdown
-        ServiceHelper.stopAndShutdownService(pool);
+        if (stopServicePool) {
+            ServiceHelper.stopAndShutdownService(pool);
+        }
         try {
             ServiceHelper.stopAndShutdownServices(producers.values());
         } finally {

http://git-wip-us.apache.org/repos/asf/camel/blob/3041cc64/camel-core/src/test/java/org/apache/camel/issues/ServicePoolAwareLeakyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/ServicePoolAwareLeakyTest.java b/camel-core/src/test/java/org/apache/camel/issues/ServicePoolAwareLeakyTest.java
new file mode 100644
index 0000000..9b3d947
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/ServicePoolAwareLeakyTest.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Route;
+import org.apache.camel.Service;
+import org.apache.camel.ServicePoolAware;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.support.LifecycleStrategySupport;
+import org.apache.camel.support.ServiceSupport;
+
+public class ServicePoolAwareLeakyTest extends ContextTestSupport {
+
+    private static final String LEAKY_SIEVE_STABLE = "leaky://sieve-stable";
+    private static final String LEAKY_SIEVE_TRANSIENT = "leaky://sieve-transient";
+
+    /**
+     * Component that provides leaks producers.
+     */
+    private static class LeakySieveComponent extends DefaultComponent {
+        @Override
+        protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+            return new LeakySieveEndpoint(uri);
+        }
+    }
+
+    /**
+     * Endpoint that provides leaky producers.
+     */
+    private static class LeakySieveEndpoint extends DefaultEndpoint {
+
+        private final String uri;
+
+        public LeakySieveEndpoint(String uri) {
+            this.uri = uri;
+        }
+
+        @Override
+        public Producer createProducer() throws Exception {
+            return new LeakySieveProducer(this);
+        }
+
+        @Override
+        public Consumer createConsumer(Processor processor) throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isSingleton() {
+            return true;
+        }
+
+        @Override
+        protected String createEndpointUri() {
+            return uri;
+        }
+    }
+
+    /**
+     * Leaky producer - implements {@link ServicePoolAware}.
+     */
+    private static class LeakySieveProducer extends DefaultProducer implements ServicePoolAware {
+
+        public LeakySieveProducer(Endpoint endpoint) {
+            super(endpoint);
+        }
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            // do nothing
+        }
+
+    }
+
+    @Override
+    protected boolean useJmx() {
+        // only occurs when using JMX as the GC root for the producer is through a ManagedProducer created by the
+        // context.addService() invocation
+        return true;
+    }
+
+    /**
+     * Returns true if verification of state should be performed during the test as opposed to at the end.
+     */
+    public boolean isFailFast() {
+        return false;
+    }
+
+    /**
+     * Returns true if during fast failure we should verify that the service pool remains in the started state.
+     */
+    public boolean isVerifyProducerServicePoolRemainsStarted() {
+        return false;
+    }
+
+    public void testForMemoryLeak() throws Exception {
+        registerLeakyComponent();
+
+        final Map<String, AtomicLong> references = new HashMap<>();
+
+        // track LeakySieveProducer lifecycle
+        context.addLifecycleStrategy(new LifecycleStrategySupport() {
+            @Override
+            public void onServiceAdd(CamelContext context, Service service, Route route) {
+                if (service instanceof LeakySieveProducer) {
+                    String key = ((LeakySieveProducer) service).getEndpoint().getEndpointKey();
+                    AtomicLong num = references.get(key);
+                    if (num == null) {
+                        num = new AtomicLong();
+                        references.put(key, num);
+                    }
+                    num.incrementAndGet();
+                }
+            }
+
+            @Override
+            public void onServiceRemove(CamelContext context, Service service, Route route) {
+                if (service instanceof LeakySieveProducer) {
+                    String key = ((LeakySieveProducer) service).getEndpoint().getEndpointKey();
+                    AtomicLong num = references.get(key);
+                    if (num == null) {
+                        num = new AtomicLong();
+                        references.put(key, num);
+                    }
+                    num.decrementAndGet();
+                }
+            }
+        });
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:sieve-transient")
+                        .id("sieve-transient")
+                        .to(LEAKY_SIEVE_TRANSIENT);
+
+                from("direct:sieve-stable")
+                        .id("sieve-stable")
+                        .to(LEAKY_SIEVE_STABLE);
+            }
+        });
+
+        context.start();
+
+        for (int i = 0; i < 1000; i++) {
+            ServiceSupport service = (ServiceSupport) context.getProducerServicePool();
+            assertEquals(ServiceStatus.Started, service.getStatus());
+            if (isFailFast()) {
+                assertEquals(2, context.getProducerServicePool().size());
+                assertEquals(1, references.get(LEAKY_SIEVE_TRANSIENT).get());
+                assertEquals(1, references.get(LEAKY_SIEVE_STABLE).get());
+            }
+
+            context.stopRoute("sieve-transient");
+
+            if (isFailFast()) {
+                assertEquals("Expected no service references to remain", 0, references.get(LEAKY_SIEVE_TRANSIENT));
+            }
+
+            if (isFailFast()) {
+                // looks like we cleared more than just our route, we've stopped and cleared the global ProducerServicePool
+                // since SendProcessor.stop() invokes ServiceHelper.stopServices(producerCache, producer); which in turn invokes
+                // ServiceHelper.stopAndShutdownService(pool);.
+                //
+                // Whilst stop on the SharedProducerServicePool is a NOOP shutdown is not and effects a stop of the pool.
+
+                if (isVerifyProducerServicePoolRemainsStarted()) {
+                    assertEquals(ServiceStatus.Started, service.getStatus());
+                }
+                assertEquals("Expected one stable producer to remain pooled", 1, context.getProducerServicePool().size());
+                assertEquals("Expected one stable producer to remain as service", 1, references.get(LEAKY_SIEVE_STABLE).get());
+            }
+
+            // Send a body to verify behaviour of send producer after another route has been stopped
+            sendBody("direct:sieve-stable", "");
+
+            if (isFailFast()) {
+                // shared pool is used despite being 'Stopped'
+                if (isVerifyProducerServicePoolRemainsStarted()) {
+                    assertEquals(ServiceStatus.Started, service.getStatus());
+                }
+
+                assertEquals("Expected only stable producer in pool", 1, context.getProducerServicePool().size());
+                assertEquals("Expected no references to transient producer", 0, references.get(LEAKY_SIEVE_TRANSIENT).get());
+                assertEquals("Expected reference to stable producer", 1, references.get(LEAKY_SIEVE_STABLE).get());
+            }
+
+            context.startRoute("sieve-transient");
+
+            // ok, back to normal
+            assertEquals(ServiceStatus.Started, service.getStatus());
+            if (isFailFast()) {
+                assertEquals("Expected both producers in pool", 2, context.getProducerServicePool().size());
+                assertEquals("Expected one transient producer as service", 1, references.get(LEAKY_SIEVE_TRANSIENT).get());
+                assertEquals("Expected one stable producer as service", 1, references.get(LEAKY_SIEVE_STABLE).get());
+            }
+        }
+
+        if (!isFailFast()) {
+            assertEquals("Expected both producers in pool", 2, context.getProducerServicePool().size());
+
+            // if not fixed these will equal the number of iterations in the loop + 1
+            assertEquals("Expected one transient producer as service", 1, references.get(LEAKY_SIEVE_TRANSIENT).get());
+            assertEquals("Expected one stable producer as service", 1, references.get(LEAKY_SIEVE_STABLE).get());
+        }
+    }
+
+    private void registerLeakyComponent() {
+        // register leaky component
+        context.addComponent("leaky", new LeakySieveComponent());
+    }
+
+}


[2/3] camel git commit: CAMEL-9143: Fixed JMX leak when using ServicePoolAware producers. Thanks to Bob Browning for the test case.

Posted by da...@apache.org.
CAMEL-9143: Fixed JMX leak when using ServicePoolAware producers. Thanks to Bob Browning for the test case.

Conflicts:
	camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e1105553
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e1105553
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e1105553

Branch: refs/heads/camel-2.15.x
Commit: e1105553d16abf916b6d632f2bb831d133f76dc8
Parents: 185b613
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Sep 17 11:18:01 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Sep 17 11:41:54 2015 +0200

----------------------------------------------------------------------
 .../org/apache/camel/impl/ProducerCache.java    |  18 +-
 .../camel/issues/ServicePoolAwareLeakyTest.java | 242 +++++++++++++++++++
 2 files changed, 256 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e1105553/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
index 4ece29f..a770049 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
@@ -55,23 +55,31 @@ public class ProducerCache extends ServiceSupport {
     private final Map<String, Producer> producers;
     private final Object source;
     private boolean eventNotifierEnabled = true;
+    private boolean stopServicePool;
 
     public ProducerCache(Object source, CamelContext camelContext) {
         this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext));
     }
 
     public ProducerCache(Object source, CamelContext camelContext, int cacheSize) {
-        this(source, camelContext, camelContext.getProducerServicePool(), createLRUCache(cacheSize));
+        this(source, camelContext, null, createLRUCache(cacheSize));
     }
 
     public ProducerCache(Object source, CamelContext camelContext, Map<String, Producer> cache) {
-        this(source, camelContext, camelContext.getProducerServicePool(), cache);
+        this(source, camelContext, null, cache);
     }
 
     public ProducerCache(Object source, CamelContext camelContext, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) {
         this.source = source;
         this.camelContext = camelContext;
-        this.pool = producerServicePool;
+        if (producerServicePool == null) {
+            // use shared producer pool which lifecycle is managed by CamelContext
+            this.pool = camelContext.getProducerServicePool();
+            this.stopServicePool = false;
+        } else {
+            this.pool = producerServicePool;
+            this.stopServicePool = true;
+        }
         this.producers = cache;
     }
 
@@ -427,7 +435,9 @@ public class ProducerCache extends ServiceSupport {
 
     protected void doStop() throws Exception {
         // when stopping we intend to shutdown
-        ServiceHelper.stopAndShutdownService(pool);
+        if (stopServicePool) {
+            ServiceHelper.stopAndShutdownService(pool);
+        }
         try {
             ServiceHelper.stopAndShutdownServices(producers.values());
         } finally {

http://git-wip-us.apache.org/repos/asf/camel/blob/e1105553/camel-core/src/test/java/org/apache/camel/issues/ServicePoolAwareLeakyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/ServicePoolAwareLeakyTest.java b/camel-core/src/test/java/org/apache/camel/issues/ServicePoolAwareLeakyTest.java
new file mode 100644
index 0000000..9b3d947
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/ServicePoolAwareLeakyTest.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Route;
+import org.apache.camel.Service;
+import org.apache.camel.ServicePoolAware;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.support.LifecycleStrategySupport;
+import org.apache.camel.support.ServiceSupport;
+
+public class ServicePoolAwareLeakyTest extends ContextTestSupport {
+
+    private static final String LEAKY_SIEVE_STABLE = "leaky://sieve-stable";
+    private static final String LEAKY_SIEVE_TRANSIENT = "leaky://sieve-transient";
+
+    /**
+     * Component that provides leaks producers.
+     */
+    private static class LeakySieveComponent extends DefaultComponent {
+        @Override
+        protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+            return new LeakySieveEndpoint(uri);
+        }
+    }
+
+    /**
+     * Endpoint that provides leaky producers.
+     */
+    private static class LeakySieveEndpoint extends DefaultEndpoint {
+
+        private final String uri;
+
+        public LeakySieveEndpoint(String uri) {
+            this.uri = uri;
+        }
+
+        @Override
+        public Producer createProducer() throws Exception {
+            return new LeakySieveProducer(this);
+        }
+
+        @Override
+        public Consumer createConsumer(Processor processor) throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isSingleton() {
+            return true;
+        }
+
+        @Override
+        protected String createEndpointUri() {
+            return uri;
+        }
+    }
+
+    /**
+     * Leaky producer - implements {@link ServicePoolAware}.
+     */
+    private static class LeakySieveProducer extends DefaultProducer implements ServicePoolAware {
+
+        public LeakySieveProducer(Endpoint endpoint) {
+            super(endpoint);
+        }
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            // do nothing
+        }
+
+    }
+
+    @Override
+    protected boolean useJmx() {
+        // only occurs when using JMX as the GC root for the producer is through a ManagedProducer created by the
+        // context.addService() invocation
+        return true;
+    }
+
+    /**
+     * Returns true if verification of state should be performed during the test as opposed to at the end.
+     */
+    public boolean isFailFast() {
+        return false;
+    }
+
+    /**
+     * Returns true if during fast failure we should verify that the service pool remains in the started state.
+     */
+    public boolean isVerifyProducerServicePoolRemainsStarted() {
+        return false;
+    }
+
+    public void testForMemoryLeak() throws Exception {
+        registerLeakyComponent();
+
+        final Map<String, AtomicLong> references = new HashMap<>();
+
+        // track LeakySieveProducer lifecycle
+        context.addLifecycleStrategy(new LifecycleStrategySupport() {
+            @Override
+            public void onServiceAdd(CamelContext context, Service service, Route route) {
+                if (service instanceof LeakySieveProducer) {
+                    String key = ((LeakySieveProducer) service).getEndpoint().getEndpointKey();
+                    AtomicLong num = references.get(key);
+                    if (num == null) {
+                        num = new AtomicLong();
+                        references.put(key, num);
+                    }
+                    num.incrementAndGet();
+                }
+            }
+
+            @Override
+            public void onServiceRemove(CamelContext context, Service service, Route route) {
+                if (service instanceof LeakySieveProducer) {
+                    String key = ((LeakySieveProducer) service).getEndpoint().getEndpointKey();
+                    AtomicLong num = references.get(key);
+                    if (num == null) {
+                        num = new AtomicLong();
+                        references.put(key, num);
+                    }
+                    num.decrementAndGet();
+                }
+            }
+        });
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:sieve-transient")
+                        .id("sieve-transient")
+                        .to(LEAKY_SIEVE_TRANSIENT);
+
+                from("direct:sieve-stable")
+                        .id("sieve-stable")
+                        .to(LEAKY_SIEVE_STABLE);
+            }
+        });
+
+        context.start();
+
+        for (int i = 0; i < 1000; i++) {
+            ServiceSupport service = (ServiceSupport) context.getProducerServicePool();
+            assertEquals(ServiceStatus.Started, service.getStatus());
+            if (isFailFast()) {
+                assertEquals(2, context.getProducerServicePool().size());
+                assertEquals(1, references.get(LEAKY_SIEVE_TRANSIENT).get());
+                assertEquals(1, references.get(LEAKY_SIEVE_STABLE).get());
+            }
+
+            context.stopRoute("sieve-transient");
+
+            if (isFailFast()) {
+                assertEquals("Expected no service references to remain", 0, references.get(LEAKY_SIEVE_TRANSIENT));
+            }
+
+            if (isFailFast()) {
+                // looks like we cleared more than just our route, we've stopped and cleared the global ProducerServicePool
+                // since SendProcessor.stop() invokes ServiceHelper.stopServices(producerCache, producer); which in turn invokes
+                // ServiceHelper.stopAndShutdownService(pool);.
+                //
+                // Whilst stop on the SharedProducerServicePool is a NOOP shutdown is not and effects a stop of the pool.
+
+                if (isVerifyProducerServicePoolRemainsStarted()) {
+                    assertEquals(ServiceStatus.Started, service.getStatus());
+                }
+                assertEquals("Expected one stable producer to remain pooled", 1, context.getProducerServicePool().size());
+                assertEquals("Expected one stable producer to remain as service", 1, references.get(LEAKY_SIEVE_STABLE).get());
+            }
+
+            // Send a body to verify behaviour of send producer after another route has been stopped
+            sendBody("direct:sieve-stable", "");
+
+            if (isFailFast()) {
+                // shared pool is used despite being 'Stopped'
+                if (isVerifyProducerServicePoolRemainsStarted()) {
+                    assertEquals(ServiceStatus.Started, service.getStatus());
+                }
+
+                assertEquals("Expected only stable producer in pool", 1, context.getProducerServicePool().size());
+                assertEquals("Expected no references to transient producer", 0, references.get(LEAKY_SIEVE_TRANSIENT).get());
+                assertEquals("Expected reference to stable producer", 1, references.get(LEAKY_SIEVE_STABLE).get());
+            }
+
+            context.startRoute("sieve-transient");
+
+            // ok, back to normal
+            assertEquals(ServiceStatus.Started, service.getStatus());
+            if (isFailFast()) {
+                assertEquals("Expected both producers in pool", 2, context.getProducerServicePool().size());
+                assertEquals("Expected one transient producer as service", 1, references.get(LEAKY_SIEVE_TRANSIENT).get());
+                assertEquals("Expected one stable producer as service", 1, references.get(LEAKY_SIEVE_STABLE).get());
+            }
+        }
+
+        if (!isFailFast()) {
+            assertEquals("Expected both producers in pool", 2, context.getProducerServicePool().size());
+
+            // if not fixed these will equal the number of iterations in the loop + 1
+            assertEquals("Expected one transient producer as service", 1, references.get(LEAKY_SIEVE_TRANSIENT).get());
+            assertEquals("Expected one stable producer as service", 1, references.get(LEAKY_SIEVE_STABLE).get());
+        }
+    }
+
+    private void registerLeakyComponent() {
+        // register leaky component
+        context.addComponent("leaky", new LeakySieveComponent());
+    }
+
+}