You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/09/17 11:41:29 UTC
[3/3] camel git commit: CAMEL-9143: Fixed JMX leak when using
ServicePoolAware producers. Thanks to Bob Browning for the test case.
CAMEL-9143: Fixed JMX leak when using ServicePoolAware producers. Thanks to Bob Browning for the test case.
Conflicts:
camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3041cc64
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3041cc64
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3041cc64
Branch: refs/heads/camel-2.14.x
Commit: 3041cc643f456bbcdfed294071e0fd0099d6da8e
Parents: 843ef83
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Sep 17 11:18:01 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Sep 17 11:42:39 2015 +0200
----------------------------------------------------------------------
.../org/apache/camel/impl/ProducerCache.java | 18 +-
.../camel/issues/ServicePoolAwareLeakyTest.java | 242 +++++++++++++++++++
2 files changed, 256 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3041cc64/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
index 4ece29f..a770049 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
@@ -55,23 +55,31 @@ public class ProducerCache extends ServiceSupport {
private final Map<String, Producer> producers;
private final Object source;
private boolean eventNotifierEnabled = true;
+ private boolean stopServicePool;
public ProducerCache(Object source, CamelContext camelContext) {
this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext));
}
public ProducerCache(Object source, CamelContext camelContext, int cacheSize) {
- this(source, camelContext, camelContext.getProducerServicePool(), createLRUCache(cacheSize));
+ this(source, camelContext, null, createLRUCache(cacheSize));
}
public ProducerCache(Object source, CamelContext camelContext, Map<String, Producer> cache) {
- this(source, camelContext, camelContext.getProducerServicePool(), cache);
+ this(source, camelContext, null, cache);
}
public ProducerCache(Object source, CamelContext camelContext, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) {
this.source = source;
this.camelContext = camelContext;
- this.pool = producerServicePool;
+ if (producerServicePool == null) {
+ // use shared producer pool which lifecycle is managed by CamelContext
+ this.pool = camelContext.getProducerServicePool();
+ this.stopServicePool = false;
+ } else {
+ this.pool = producerServicePool;
+ this.stopServicePool = true;
+ }
this.producers = cache;
}
@@ -427,7 +435,9 @@ public class ProducerCache extends ServiceSupport {
protected void doStop() throws Exception {
// when stopping we intend to shutdown
- ServiceHelper.stopAndShutdownService(pool);
+ if (stopServicePool) {
+ ServiceHelper.stopAndShutdownService(pool);
+ }
try {
ServiceHelper.stopAndShutdownServices(producers.values());
} finally {
http://git-wip-us.apache.org/repos/asf/camel/blob/3041cc64/camel-core/src/test/java/org/apache/camel/issues/ServicePoolAwareLeakyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/ServicePoolAwareLeakyTest.java b/camel-core/src/test/java/org/apache/camel/issues/ServicePoolAwareLeakyTest.java
new file mode 100644
index 0000000..9b3d947
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/ServicePoolAwareLeakyTest.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Route;
+import org.apache.camel.Service;
+import org.apache.camel.ServicePoolAware;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.support.LifecycleStrategySupport;
+import org.apache.camel.support.ServiceSupport;
+
+public class ServicePoolAwareLeakyTest extends ContextTestSupport {
+
+ private static final String LEAKY_SIEVE_STABLE = "leaky://sieve-stable";
+ private static final String LEAKY_SIEVE_TRANSIENT = "leaky://sieve-transient";
+
+ /**
+ * Component that provides leaks producers.
+ */
+ private static class LeakySieveComponent extends DefaultComponent {
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ return new LeakySieveEndpoint(uri);
+ }
+ }
+
+ /**
+ * Endpoint that provides leaky producers.
+ */
+ private static class LeakySieveEndpoint extends DefaultEndpoint {
+
+ private final String uri;
+
+ public LeakySieveEndpoint(String uri) {
+ this.uri = uri;
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ return new LeakySieveProducer(this);
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+
+ @Override
+ protected String createEndpointUri() {
+ return uri;
+ }
+ }
+
+ /**
+ * Leaky producer - implements {@link ServicePoolAware}.
+ */
+ private static class LeakySieveProducer extends DefaultProducer implements ServicePoolAware {
+
+ public LeakySieveProducer(Endpoint endpoint) {
+ super(endpoint);
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ // do nothing
+ }
+
+ }
+
+ @Override
+ protected boolean useJmx() {
+ // only occurs when using JMX as the GC root for the producer is through a ManagedProducer created by the
+ // context.addService() invocation
+ return true;
+ }
+
+ /**
+ * Returns true if verification of state should be performed during the test as opposed to at the end.
+ */
+ public boolean isFailFast() {
+ return false;
+ }
+
+ /**
+ * Returns true if during fast failure we should verify that the service pool remains in the started state.
+ */
+ public boolean isVerifyProducerServicePoolRemainsStarted() {
+ return false;
+ }
+
+ public void testForMemoryLeak() throws Exception {
+ registerLeakyComponent();
+
+ final Map<String, AtomicLong> references = new HashMap<>();
+
+ // track LeakySieveProducer lifecycle
+ context.addLifecycleStrategy(new LifecycleStrategySupport() {
+ @Override
+ public void onServiceAdd(CamelContext context, Service service, Route route) {
+ if (service instanceof LeakySieveProducer) {
+ String key = ((LeakySieveProducer) service).getEndpoint().getEndpointKey();
+ AtomicLong num = references.get(key);
+ if (num == null) {
+ num = new AtomicLong();
+ references.put(key, num);
+ }
+ num.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void onServiceRemove(CamelContext context, Service service, Route route) {
+ if (service instanceof LeakySieveProducer) {
+ String key = ((LeakySieveProducer) service).getEndpoint().getEndpointKey();
+ AtomicLong num = references.get(key);
+ if (num == null) {
+ num = new AtomicLong();
+ references.put(key, num);
+ }
+ num.decrementAndGet();
+ }
+ }
+ });
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:sieve-transient")
+ .id("sieve-transient")
+ .to(LEAKY_SIEVE_TRANSIENT);
+
+ from("direct:sieve-stable")
+ .id("sieve-stable")
+ .to(LEAKY_SIEVE_STABLE);
+ }
+ });
+
+ context.start();
+
+ for (int i = 0; i < 1000; i++) {
+ ServiceSupport service = (ServiceSupport) context.getProducerServicePool();
+ assertEquals(ServiceStatus.Started, service.getStatus());
+ if (isFailFast()) {
+ assertEquals(2, context.getProducerServicePool().size());
+ assertEquals(1, references.get(LEAKY_SIEVE_TRANSIENT).get());
+ assertEquals(1, references.get(LEAKY_SIEVE_STABLE).get());
+ }
+
+ context.stopRoute("sieve-transient");
+
+ if (isFailFast()) {
+ assertEquals("Expected no service references to remain", 0, references.get(LEAKY_SIEVE_TRANSIENT));
+ }
+
+ if (isFailFast()) {
+ // looks like we cleared more than just our route, we've stopped and cleared the global ProducerServicePool
+ // since SendProcessor.stop() invokes ServiceHelper.stopServices(producerCache, producer); which in turn invokes
+ // ServiceHelper.stopAndShutdownService(pool);.
+ //
+ // Whilst stop on the SharedProducerServicePool is a NOOP shutdown is not and effects a stop of the pool.
+
+ if (isVerifyProducerServicePoolRemainsStarted()) {
+ assertEquals(ServiceStatus.Started, service.getStatus());
+ }
+ assertEquals("Expected one stable producer to remain pooled", 1, context.getProducerServicePool().size());
+ assertEquals("Expected one stable producer to remain as service", 1, references.get(LEAKY_SIEVE_STABLE).get());
+ }
+
+ // Send a body to verify behaviour of send producer after another route has been stopped
+ sendBody("direct:sieve-stable", "");
+
+ if (isFailFast()) {
+ // shared pool is used despite being 'Stopped'
+ if (isVerifyProducerServicePoolRemainsStarted()) {
+ assertEquals(ServiceStatus.Started, service.getStatus());
+ }
+
+ assertEquals("Expected only stable producer in pool", 1, context.getProducerServicePool().size());
+ assertEquals("Expected no references to transient producer", 0, references.get(LEAKY_SIEVE_TRANSIENT).get());
+ assertEquals("Expected reference to stable producer", 1, references.get(LEAKY_SIEVE_STABLE).get());
+ }
+
+ context.startRoute("sieve-transient");
+
+ // ok, back to normal
+ assertEquals(ServiceStatus.Started, service.getStatus());
+ if (isFailFast()) {
+ assertEquals("Expected both producers in pool", 2, context.getProducerServicePool().size());
+ assertEquals("Expected one transient producer as service", 1, references.get(LEAKY_SIEVE_TRANSIENT).get());
+ assertEquals("Expected one stable producer as service", 1, references.get(LEAKY_SIEVE_STABLE).get());
+ }
+ }
+
+ if (!isFailFast()) {
+ assertEquals("Expected both producers in pool", 2, context.getProducerServicePool().size());
+
+ // if not fixed these will equal the number of iterations in the loop + 1
+ assertEquals("Expected one transient producer as service", 1, references.get(LEAKY_SIEVE_TRANSIENT).get());
+ assertEquals("Expected one stable producer as service", 1, references.get(LEAKY_SIEVE_STABLE).get());
+ }
+ }
+
+ private void registerLeakyComponent() {
+ // register leaky component
+ context.addComponent("leaky", new LeakySieveComponent());
+ }
+
+}