You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/24 15:12:08 UTC

[camel] branch exchange-factory updated (049fbe9 -> 95790be)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 049fbe9  CAMEL-16222: PooledExchangeFactory experiment
     new 161ed69  CAMEL-16222: PooledExchangeFactory experiment
     new 95790be  CAMEL-16222: PooledExchangeFactory experiment

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/camel/ExtendedCamelContext.java     |  11 ++
 .../java/org/apache/camel/spi/ExchangeFactory.java |  73 ++++++++-
 .../apache/camel/spi/ExchangeFactoryManager.java   |  88 ++++++++++
 .../camel/impl/engine/AbstractCamelContext.java    |  26 ++-
 .../camel/impl/engine/DefaultExchangeFactory.java  | 161 +++++++++++++++++-
 .../impl/engine/DefaultExchangeFactoryManager.java | 181 +++++++++++++++++++++
 .../camel/impl/engine/PooledExchangeFactory.java   | 110 +++++--------
 .../camel/impl/engine/SimpleCamelContext.java      |   6 +
 .../camel/impl/ExtendedCamelContextConfigurer.java |   6 +
 .../camel/impl/lw/LightweightCamelContext.java     |  11 ++
 .../impl/lw/LightweightRuntimeCamelContext.java    |  13 ++
 .../api/management/mbean/CamelOpenMBeanTypes.java  |  15 ++
 .../mbean/ManagedExchangeFactoryManagerMBean.java  |  62 +++++++
 .../management/JmxManagementLifecycleStrategy.java |   4 +
 .../mbean/ManagedExchangeFactoryManager.java       | 144 ++++++++++++++++
 .../management/ManagedNonManagedServiceTest.java   |   2 +-
 .../management/ManagedPooledExchangeTest.java}     |  63 ++++++-
 ...edProducerRouteAddRemoveRegisterAlwaysTest.java |   2 +-
 .../management/ManagedRouteAddRemoveTest.java      |   2 +-
 .../src/test/resources/log4j2.properties           |   2 +-
 .../org/apache/camel/support/DefaultConsumer.java  |   5 +-
 21 files changed, 896 insertions(+), 91 deletions(-)
 create mode 100644 core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java
 create mode 100644 core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java
 create mode 100644 core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java
 create mode 100644 core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java
 copy core/{camel-core/src/test/java/org/apache/camel/processor/PooledExchangeTest.java => camel-management/src/test/java/org/apache/camel/management/ManagedPooledExchangeTest.java} (62%)


[camel] 01/02: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 161ed6966a7c8185fc7818b51d0bc58fde19ec1b
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Feb 24 12:36:54 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../org/apache/camel/ExtendedCamelContext.java     |  11 ++
 .../java/org/apache/camel/spi/ExchangeFactory.java |  73 +++++++++-
 .../apache/camel/spi/ExchangeFactoryManager.java   |  78 ++++++++++
 .../camel/impl/engine/AbstractCamelContext.java    |  26 +++-
 .../camel/impl/engine/DefaultExchangeFactory.java  | 161 ++++++++++++++++++++-
 .../impl/engine/DefaultExchangeFactoryManager.java | 101 +++++++++++++
 .../camel/impl/engine/PooledExchangeFactory.java   | 110 ++++++--------
 .../camel/impl/engine/SimpleCamelContext.java      |   6 +
 .../camel/impl/ExtendedCamelContextConfigurer.java |   6 +
 .../camel/impl/lw/LightweightCamelContext.java     |  11 ++
 .../impl/lw/LightweightRuntimeCamelContext.java    |  13 ++
 .../api/management/mbean/CamelOpenMBeanTypes.java  |  13 ++
 .../mbean/ManagedExchangeFactoryManagerMBean.java  |  44 ++++++
 .../management/JmxManagementLifecycleStrategy.java |   4 +
 .../mbean/ManagedExchangeFactoryManager.java       | 110 ++++++++++++++
 .../org/apache/camel/support/DefaultConsumer.java  |   5 +-
 16 files changed, 693 insertions(+), 79 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
index 6cc0fc5..4689df9 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
@@ -37,6 +37,7 @@ import org.apache.camel.spi.DeferServiceFactory;
 import org.apache.camel.spi.EndpointStrategy;
 import org.apache.camel.spi.EndpointUriFactory;
 import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExchangeFactoryManager;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.FactoryFinderResolver;
 import org.apache.camel.spi.HeadersMapFactory;
@@ -225,6 +226,16 @@ public interface ExtendedCamelContext extends CamelContext {
     void setExchangeFactory(ExchangeFactory exchangeFactory);
 
     /**
+     * Gets the exchange factory manager to use.
+     */
+    ExchangeFactoryManager getExchangeFactoryManager();
+
+    /**
+     * Sets a custom exchange factory manager to use.
+     */
+    void setExchangeFactoryManager(ExchangeFactoryManager exchangeFactoryManager);
+
+    /**
      * Returns the bean post processor used to do any bean customization.
      *
      * @return the bean post processor.
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index e2047e1..ac8d7e6 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -16,9 +16,12 @@
  */
 package org.apache.camel.spi;
 
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.NonManagedService;
+import org.apache.camel.Service;
 
 /**
  * Factory used by {@link Consumer} to create Camel {@link Exchange} holding the incoming message received by the
@@ -33,7 +36,50 @@ import org.apache.camel.Exchange;
  * The factory is pluggable which allows to use different strategies. The default factory will create a new
  * {@link Exchange} instance, and the pooled factory will pool and reuse exchanges.
  */
-public interface ExchangeFactory {
+public interface ExchangeFactory extends Service, CamelContextAware, NonManagedService {
+
+    /**
+     * Utilization statistics of the this factory.
+     */
+    interface Statistics {
+
+        /**
+         * Number of new exchanges created.
+         */
+        long getCreatedCounter();
+
+        /**
+         * Number of exchanges acquired (reused) when using pooled factory.
+         */
+        long getAcquiredCounter();
+
+        /**
+         * Number of exchanges released back to pool
+         */
+        long getReleasedCounter();
+
+        /**
+         * Number of exchanges discarded (thrown away) such as if no space in cache pool.
+         */
+        long getDiscardedCounter();
+
+        /**
+         * Reset the counters
+         */
+        void reset();
+
+        /**
+         * Whether statistics is enabled.
+         */
+        boolean isStatisticsEnabled();
+
+        /**
+         * Sets whether statistics is enabled.
+         *
+         * @param statisticsEnabled <tt>true</tt> to enable
+         */
+        void setStatisticsEnabled(boolean statisticsEnabled);
+    }
 
     /**
      * Service factory key.
@@ -41,6 +87,11 @@ public interface ExchangeFactory {
     String FACTORY = "exchange-factory";
 
     /**
+     * The consumer using this factory.
+     */
+    Consumer getConsumer();
+
+    /**
      * Creates a new {@link ExchangeFactory} that is private for the given consumer.
      *
      * @param  consumer the consumer that will use the created {@link ExchangeFactory}
@@ -79,6 +130,11 @@ public interface ExchangeFactory {
     int getCapacity();
 
     /**
+     * The current number of exchanges in the pool
+     */
+    int getSize();
+
+    /**
      * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100.
      */
     void setCapacity(int capacity);
@@ -93,4 +149,19 @@ public interface ExchangeFactory {
      */
     void setStatisticsEnabled(boolean statisticsEnabled);
 
+    /**
+     * Reset the statistics
+     */
+    void resetStatistics();
+
+    /**
+     * Purges the internal cache (if pooled)
+     */
+    void purge();
+
+    /**
+     * Gets the usage statistics
+     */
+    Statistics getStatistics();
+
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java
new file mode 100644
index 0000000..a46884a
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.Collection;
+
+import org.apache.camel.StaticService;
+
+/**
+ * Manages {@link ExchangeFactory}.
+ */
+public interface ExchangeFactoryManager extends StaticService {
+
+    /**
+     * Adds the {@link ExchangeFactory} to be managed.
+     *
+     * @param exchangeFactory the exchange factory
+     */
+    void addExchangeFactory(ExchangeFactory exchangeFactory);
+
+    /**
+     * Removes the {@link ExchangeFactory} from being managed (such as when a route is stopped/removed) or during
+     * shutdown.
+     *
+     * @param exchangeFactory the exchange factory
+     */
+    void removeExchangeFactory(ExchangeFactory exchangeFactory);
+
+    /**
+     * Returns a read-only view of the managed factories.
+     */
+    Collection<ExchangeFactory> getExchangeFactories();
+
+    /**
+     * Number of consumers currently being managed
+     */
+    int getSize();
+
+    /**
+     * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100.
+     */
+    int getCapacity();
+
+    /**
+     * Whether statistics is enabled.
+     */
+    boolean isStatisticsEnabled();
+
+    /**
+     * Whether statistics is enabled.
+     */
+    void setStatisticsEnabled(boolean statisticsEnabled);
+
+    /**
+     * Reset the statistics
+     */
+    void resetStatistics();
+
+    /**
+     * Purges the internal caches (if pooled)
+     */
+    void purge();
+
+}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 22195da..305bfa9 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -102,6 +102,7 @@ import org.apache.camel.spi.EndpointStrategy;
 import org.apache.camel.spi.EndpointUriFactory;
 import org.apache.camel.spi.EventNotifier;
 import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExchangeFactoryManager;
 import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.FactoryFinderResolver;
@@ -265,6 +266,7 @@ public abstract class AbstractCamelContext extends BaseService
     private volatile String version;
     private volatile PropertiesComponent propertiesComponent;
     private volatile CamelContextNameStrategy nameStrategy;
+    private volatile ExchangeFactoryManager exchangeFactoryManager = new DefaultExchangeFactoryManager();
     private volatile ExchangeFactory exchangeFactory;
     private volatile ReactiveExecutor reactiveExecutor;
     private volatile ManagementNameStrategy managementNameStrategy;
@@ -3684,6 +3686,7 @@ public abstract class AbstractCamelContext extends BaseService
         reactiveExecutor = null;
         asyncProcessorAwaitManager = null;
         exchangeFactory = null;
+        exchangeFactoryManager = null;
     }
 
     /**
@@ -4657,7 +4660,26 @@ public abstract class AbstractCamelContext extends BaseService
 
     @Override
     public void setExchangeFactory(ExchangeFactory exchangeFactory) {
-        this.exchangeFactory = doAddService(exchangeFactory);
+        // automatic inject camel context
+        exchangeFactory.setCamelContext(this);
+        this.exchangeFactory = exchangeFactory;
+    }
+
+    @Override
+    public ExchangeFactoryManager getExchangeFactoryManager() {
+        if (exchangeFactoryManager == null) {
+            synchronized (lock) {
+                if (exchangeFactoryManager == null) {
+                    setExchangeFactoryManager(createExchangeFactoryManager());
+                }
+            }
+        }
+        return exchangeFactoryManager;
+    }
+
+    @Override
+    public void setExchangeFactoryManager(ExchangeFactoryManager exchangeFactoryManager) {
+        this.exchangeFactoryManager = doAddService(exchangeFactoryManager);
     }
 
     @Override
@@ -4754,6 +4776,8 @@ public abstract class AbstractCamelContext extends BaseService
 
     protected abstract ExchangeFactory createExchangeFactory();
 
+    protected abstract ExchangeFactoryManager createExchangeFactoryManager();
+
     protected abstract HealthCheckRegistry createHealthCheckRegistry();
 
     protected abstract ReactiveExecutor createReactiveExecutor();
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
index 1ca6740..ae7e601 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
@@ -16,20 +16,51 @@
  */
 package org.apache.camel.impl.engine;
 
+import java.util.concurrent.atomic.LongAdder;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExchangeFactoryManager;
 import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default {@link ExchangeFactory} that creates a new {@link Exchange} instance.
  */
-public final class DefaultExchangeFactory implements ExchangeFactory, CamelContextAware {
+public class DefaultExchangeFactory extends ServiceSupport implements ExchangeFactory, CamelContextAware {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultExchangeFactory.class);
 
-    private CamelContext camelContext;
+    final UtilizationStatistics statistics = new UtilizationStatistics();
+    final Consumer consumer;
+    CamelContext camelContext;
+    ExchangeFactoryManager exchangeFactoryManager;
+
+    public DefaultExchangeFactory() {
+        this.consumer = null;
+    }
+
+    public DefaultExchangeFactory(Consumer consumer) {
+        this.consumer = consumer;
+    }
+
+    @Override
+    protected void doBuild() throws Exception {
+        this.exchangeFactoryManager = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactoryManager();
+    }
+
+    @Override
+    public Consumer getConsumer() {
+        return consumer;
+    }
 
     @Override
     public CamelContext getCamelContext() {
@@ -43,28 +74,44 @@ public final class DefaultExchangeFactory implements ExchangeFactory, CamelConte
 
     @Override
     public ExchangeFactory newExchangeFactory(Consumer consumer) {
-        // we just use a shared factory
-        return this;
+        DefaultExchangeFactory answer = new DefaultExchangeFactory(consumer);
+        answer.setStatisticsEnabled(statistics.isStatisticsEnabled());
+        answer.setCamelContext(camelContext);
+        return answer;
     }
 
     @Override
     public Exchange create(boolean autoRelease) {
+        if (statistics.isStatisticsEnabled()) {
+            statistics.created.increment();
+        }
         return new DefaultExchange(camelContext);
     }
 
     @Override
     public Exchange create(Endpoint fromEndpoint, boolean autoRelease) {
+        if (statistics.isStatisticsEnabled()) {
+            statistics.created.increment();
+        }
         return new DefaultExchange(fromEndpoint);
     }
 
     @Override
+    public boolean release(Exchange exchange) {
+        if (statistics.isStatisticsEnabled()) {
+            statistics.released.increment();
+        }
+        return true;
+    }
+
+    @Override
     public boolean isStatisticsEnabled() {
-        return false;
+        return statistics.isStatisticsEnabled();
     }
 
     @Override
     public void setStatisticsEnabled(boolean statisticsEnabled) {
-        // not in use
+        statistics.setStatisticsEnabled(statisticsEnabled);
     }
 
     @Override
@@ -73,7 +120,109 @@ public final class DefaultExchangeFactory implements ExchangeFactory, CamelConte
     }
 
     @Override
+    public int getSize() {
+        return 0;
+    }
+
+    @Override
     public void setCapacity(int capacity) {
         // not in use
     }
+
+    @Override
+    public void resetStatistics() {
+        statistics.reset();
+    }
+
+    @Override
+    public void purge() {
+        // not in use
+    }
+
+    @Override
+    public Statistics getStatistics() {
+        return statistics;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        exchangeFactoryManager.addExchangeFactory(this);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        exchangeFactoryManager.removeExchangeFactory(this);
+        logUsageSummary(LOG, "DefaultExchangeFactory", 0);
+        statistics.reset();
+    }
+
+    void logUsageSummary(Logger log, String name, int pooled) {
+        if (statistics.isStatisticsEnabled() && consumer != null) {
+            // only log if there is any usage
+            long created = statistics.getCreatedCounter();
+            long acquired = statistics.getAcquiredCounter();
+            long released = statistics.getReleasedCounter();
+            long discarded = statistics.getDiscardedCounter();
+            boolean shouldLog = pooled > 0 || created > 0 || acquired > 0 || released > 0 || discarded > 0;
+            if (shouldLog) {
+                String uri = consumer.getEndpoint().getEndpointBaseUri();
+                uri = URISupport.sanitizeUri(uri);
+
+                LOG.info("{} ({}) usage [pooled: {}, created: {}, acquired: {} released: {}, discarded: {}]",
+                        name, uri, pooled, created, acquired, released, discarded);
+            }
+        }
+    }
+
+    /**
+     * Represents utilization statistics
+     */
+    final class UtilizationStatistics implements ExchangeFactory.Statistics {
+
+        private boolean statisticsEnabled;
+
+        final LongAdder created = new LongAdder();
+        final LongAdder acquired = new LongAdder();
+        final LongAdder released = new LongAdder();
+        final LongAdder discarded = new LongAdder();
+
+        @Override
+        public void reset() {
+            created.reset();
+            acquired.reset();
+            released.reset();
+            discarded.reset();
+        }
+
+        @Override
+        public long getCreatedCounter() {
+            return created.longValue();
+        }
+
+        @Override
+        public long getAcquiredCounter() {
+            return acquired.longValue();
+        }
+
+        @Override
+        public long getReleasedCounter() {
+            return released.longValue();
+        }
+
+        @Override
+        public long getDiscardedCounter() {
+            return discarded.longValue();
+        }
+
+        @Override
+        public boolean isStatisticsEnabled() {
+            return statisticsEnabled;
+        }
+
+        @Override
+        public void setStatisticsEnabled(boolean statisticsEnabled) {
+            this.statisticsEnabled = statisticsEnabled;
+        }
+    }
+
 }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java
new file mode 100644
index 0000000..7b5ffc1
--- /dev/null
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Consumer;
+import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExchangeFactoryManager;
+import org.apache.camel.support.service.ServiceSupport;
+
+public class DefaultExchangeFactoryManager extends ServiceSupport implements ExchangeFactoryManager, CamelContextAware {
+
+    private CamelContext camelContext;
+    private final Map<Consumer, ExchangeFactory> factories = new ConcurrentHashMap<>();
+    private int capacity;
+    private boolean statisticsEnabled;
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public void addExchangeFactory(ExchangeFactory exchangeFactory) {
+        factories.put(exchangeFactory.getConsumer(), exchangeFactory);
+        // same for all factories
+        capacity = exchangeFactory.getCapacity();
+        statisticsEnabled = exchangeFactory.isStatisticsEnabled();
+    }
+
+    @Override
+    public void removeExchangeFactory(ExchangeFactory exchangeFactory) {
+        factories.remove(exchangeFactory.getConsumer());
+    }
+
+    @Override
+    public Collection<ExchangeFactory> getExchangeFactories() {
+        return Collections.unmodifiableCollection(factories.values());
+    }
+
+    @Override
+    public int getSize() {
+        return factories.size();
+    }
+
+    @Override
+    public int getCapacity() {
+        return capacity;
+    }
+
+    @Override
+    public boolean isStatisticsEnabled() {
+        return statisticsEnabled;
+    }
+
+    @Override
+    public void setStatisticsEnabled(boolean statisticsEnabled) {
+        this.statisticsEnabled = statisticsEnabled;
+        for (ExchangeFactory ef : factories.values()) {
+            ef.setStatisticsEnabled(statisticsEnabled);
+        }
+    }
+
+    @Override
+    public void resetStatistics() {
+        factories.values().forEach(ExchangeFactory::resetStatistics);
+    }
+
+    @Override
+    public void purge() {
+        factories.values().forEach(ExchangeFactory::purge);
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        factories.clear();
+    }
+}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index dcd0a03..e83fbfd 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -18,102 +18,85 @@ package org.apache.camel.impl.engine;
 
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.NonManagedService;
 import org.apache.camel.PooledExchange;
-import org.apache.camel.StaticService;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.support.DefaultPooledExchange;
-import org.apache.camel.support.service.ServiceSupport;
-import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Pooled {@link ExchangeFactory} that reuses {@link Exchange} instance from a pool.
  */
-public final class PooledExchangeFactory extends ServiceSupport
-        implements ExchangeFactory, CamelContextAware, StaticService, NonManagedService {
+public final class PooledExchangeFactory extends DefaultExchangeFactory {
 
     private static final Logger LOG = LoggerFactory.getLogger(PooledExchangeFactory.class);
 
     private final ReleaseOnDoneTask onDone = new ReleaseOnDoneTask();
     private final Consumer consumer;
     private BlockingQueue<Exchange> pool;
-    private final AtomicLong acquired = new AtomicLong();
-    private final AtomicLong created = new AtomicLong();
-    private final AtomicLong released = new AtomicLong();
-    private final AtomicLong discarded = new AtomicLong();
-
-    private CamelContext camelContext;
     private int capacity = 100;
-    private boolean statisticsEnabled;
 
     public PooledExchangeFactory() {
         this.consumer = null;
     }
 
-    private PooledExchangeFactory(Consumer consumer, CamelContext camelContext, boolean statisticsEnabled, int capacity) {
+    public PooledExchangeFactory(Consumer consumer) {
         this.consumer = consumer;
-        this.camelContext = camelContext;
-        this.statisticsEnabled = statisticsEnabled;
-        this.capacity = capacity;
     }
 
     @Override
     protected void doBuild() throws Exception {
+        super.doBuild();
         this.pool = new ArrayBlockingQueue<>(capacity);
     }
 
     @Override
-    public CamelContext getCamelContext() {
-        return camelContext;
-    }
-
-    @Override
-    public void setCamelContext(CamelContext camelContext) {
-        this.camelContext = camelContext;
+    public Consumer getConsumer() {
+        return consumer;
     }
 
     @Override
     public ExchangeFactory newExchangeFactory(Consumer consumer) {
-        return new PooledExchangeFactory(consumer, camelContext, statisticsEnabled, capacity);
+        PooledExchangeFactory answer = new PooledExchangeFactory(consumer);
+        answer.setCamelContext(camelContext);
+        answer.setCapacity(capacity);
+        answer.setStatisticsEnabled(isStatisticsEnabled());
+        return answer;
     }
 
     public int getCapacity() {
         return capacity;
     }
 
-    public void setCapacity(int capacity) {
-        this.capacity = capacity;
-    }
-
-    public boolean isStatisticsEnabled() {
-        return statisticsEnabled;
+    @Override
+    public int getSize() {
+        if (pool != null) {
+            return pool.size();
+        } else {
+            return 0;
+        }
     }
 
-    public void setStatisticsEnabled(boolean statisticsEnabled) {
-        this.statisticsEnabled = statisticsEnabled;
+    public void setCapacity(int capacity) {
+        this.capacity = capacity;
     }
 
     @Override
     public Exchange create(boolean autoRelease) {
         Exchange exchange = pool.poll();
         if (exchange == null) {
-            if (statisticsEnabled) {
-                created.incrementAndGet();
-            }
             // create a new exchange as there was no free from the pool
             exchange = createPooledExchange(null, autoRelease);
+            if (statistics.isStatisticsEnabled()) {
+                statistics.created.increment();
+            }
         } else {
-            if (statisticsEnabled) {
-                acquired.incrementAndGet();
+            if (statistics.isStatisticsEnabled()) {
+                statistics.acquired.increment();
             }
             // reset exchange for reuse
             PooledExchange ee = exchange.adapt(PooledExchange.class);
@@ -126,14 +109,14 @@ public final class PooledExchangeFactory extends ServiceSupport
     public Exchange create(Endpoint fromEndpoint, boolean autoRelease) {
         Exchange exchange = pool.poll();
         if (exchange == null) {
-            if (statisticsEnabled) {
-                created.incrementAndGet();
-            }
             // create a new exchange as there was no free from the pool
             exchange = new DefaultPooledExchange(fromEndpoint);
+            if (statistics.isStatisticsEnabled()) {
+                statistics.created.increment();
+            }
         } else {
-            if (statisticsEnabled) {
-                acquired.incrementAndGet();
+            if (statistics.isStatisticsEnabled()) {
+                statistics.acquired.increment();
             }
             // reset exchange for reuse
             PooledExchange ee = exchange.adapt(PooledExchange.class);
@@ -154,17 +137,17 @@ public final class PooledExchangeFactory extends ServiceSupport
             // only release back in pool if reset was success
             boolean inserted = pool.offer(exchange);
 
-            if (statisticsEnabled) {
+            if (statistics.isStatisticsEnabled()) {
                 if (inserted) {
-                    released.incrementAndGet();
+                    statistics.released.increment();
                 } else {
-                    discarded.incrementAndGet();
+                    statistics.discarded.increment();
                 }
             }
             return inserted;
         } catch (Exception e) {
-            if (statisticsEnabled) {
-                discarded.incrementAndGet();
+            if (statistics.isStatisticsEnabled()) {
+                statistics.discarded.increment();
             }
             LOG.debug("Error resetting exchange: {}. This exchange is discarded.", exchange);
             return false;
@@ -187,25 +170,18 @@ public final class PooledExchangeFactory extends ServiceSupport
     }
 
     @Override
-    protected void doStop() throws Exception {
+    public void purge() {
         pool.clear();
+    }
 
-        if (statisticsEnabled && consumer != null) {
-            // only log if there is any usage
-            boolean shouldLog = created.get() > 0 || acquired.get() > 0 || released.get() > 0 || discarded.get() > 0;
-            if (shouldLog) {
-                String uri = consumer.getEndpoint().getEndpointBaseUri();
-                uri = URISupport.sanitizeUri(uri);
-
-                LOG.info("PooledExchangeFactory ({}) usage [created: {}, reused: {}, released: {}, discarded: {}]",
-                        uri, created.get(), acquired.get(), released.get(), discarded.get());
-            }
-        }
+    @Override
+    protected void doStop() throws Exception {
+        exchangeFactoryManager.removeExchangeFactory(this);
+        logUsageSummary(LOG, "PooledExchangeFactory", pool.size());
+        statistics.reset();
+        pool.clear();
 
-        created.set(0);
-        acquired.set(0);
-        released.set(0);
-        discarded.set(0);
+        // do not call super
     }
 
     private final class ReleaseOnDoneTask implements PooledExchange.OnDoneTask {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
index b91710c..90f7a62 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
@@ -42,6 +42,7 @@ import org.apache.camel.spi.DataFormatResolver;
 import org.apache.camel.spi.DeferServiceFactory;
 import org.apache.camel.spi.EndpointRegistry;
 import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExchangeFactoryManager;
 import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.FactoryFinderResolver;
@@ -555,6 +556,11 @@ public class SimpleCamelContext extends AbstractCamelContext {
     }
 
     @Override
+    protected ExchangeFactoryManager createExchangeFactoryManager() {
+        return new DefaultExchangeFactoryManager();
+    }
+
+    @Override
     protected ReactiveExecutor createReactiveExecutor() {
         Optional<ReactiveExecutor> result = ResolverHelper.resolveService(
                 getCamelContextReference(),
diff --git a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
index b6e2a47..fc3b1b3 100644
--- a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
+++ b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
@@ -67,6 +67,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
         case "EventNotificationApplicable": target.setEventNotificationApplicable(property(camelContext, boolean.class, value)); return true;
         case "exchangefactory":
         case "ExchangeFactory": target.setExchangeFactory(property(camelContext, org.apache.camel.spi.ExchangeFactory.class, value)); return true;
+        case "exchangefactorymanager":
+        case "ExchangeFactoryManager": target.setExchangeFactoryManager(property(camelContext, org.apache.camel.spi.ExchangeFactoryManager.class, value)); return true;
         case "executorservicemanager":
         case "ExecutorServiceManager": target.setExecutorServiceManager(property(camelContext, org.apache.camel.spi.ExecutorServiceManager.class, value)); return true;
         case "factoryfinderresolver":
@@ -236,6 +238,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
         case "EventNotificationApplicable": return boolean.class;
         case "exchangefactory":
         case "ExchangeFactory": return org.apache.camel.spi.ExchangeFactory.class;
+        case "exchangefactorymanager":
+        case "ExchangeFactoryManager": return org.apache.camel.spi.ExchangeFactoryManager.class;
         case "executorservicemanager":
         case "ExecutorServiceManager": return org.apache.camel.spi.ExecutorServiceManager.class;
         case "factoryfinderresolver":
@@ -406,6 +410,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
         case "EventNotificationApplicable": return target.isEventNotificationApplicable();
         case "exchangefactory":
         case "ExchangeFactory": return target.getExchangeFactory();
+        case "exchangefactorymanager":
+        case "ExchangeFactoryManager": return target.getExchangeFactoryManager();
         case "executorservicemanager":
         case "ExecutorServiceManager": return target.getExecutorServiceManager();
         case "factoryfinderresolver":
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
index 7732b81..85f642e 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
@@ -89,6 +89,7 @@ import org.apache.camel.spi.EndpointRegistry;
 import org.apache.camel.spi.EndpointStrategy;
 import org.apache.camel.spi.EndpointUriFactory;
 import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExchangeFactoryManager;
 import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.FactoryFinderResolver;
@@ -1452,6 +1453,16 @@ public class LightweightCamelContext implements ExtendedCamelContext, CatalogCam
     }
 
     @Override
+    public ExchangeFactoryManager getExchangeFactoryManager() {
+        return getExtendedCamelContext().getExchangeFactoryManager();
+    }
+
+    @Override
+    public void setExchangeFactoryManager(ExchangeFactoryManager exchangeFactoryManager) {
+        getExtendedCamelContext().setExchangeFactoryManager(exchangeFactoryManager);
+    }
+
+    @Override
     public ReactiveExecutor getReactiveExecutor() {
         return getExtendedCamelContext().getReactiveExecutor();
     }
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
index cc96226..931073b 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
@@ -86,6 +86,7 @@ import org.apache.camel.spi.EndpointRegistry;
 import org.apache.camel.spi.EndpointStrategy;
 import org.apache.camel.spi.EndpointUriFactory;
 import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExchangeFactoryManager;
 import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.FactoryFinderResolver;
@@ -169,6 +170,7 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
     private final BeanIntrospection beanIntrospection;
     private final HeadersMapFactory headersMapFactory;
     private final ExchangeFactory exchangeFactory;
+    private final ExchangeFactoryManager exchangeFactoryManager;
     private final ReactiveExecutor reactiveExecutor;
     private final AsyncProcessorAwaitManager asyncProcessorAwaitManager;
     private final ExecutorServiceManager executorServiceManager;
@@ -214,6 +216,7 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
         beanIntrospection = context.adapt(ExtendedCamelContext.class).getBeanIntrospection();
         headersMapFactory = context.adapt(ExtendedCamelContext.class).getHeadersMapFactory();
         exchangeFactory = context.adapt(ExtendedCamelContext.class).getExchangeFactory();
+        exchangeFactoryManager = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager();
         reactiveExecutor = context.adapt(ExtendedCamelContext.class).getReactiveExecutor();
         asyncProcessorAwaitManager = context.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
         executorServiceManager = context.getExecutorServiceManager();
@@ -1567,6 +1570,16 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
     }
 
     @Override
+    public ExchangeFactoryManager getExchangeFactoryManager() {
+        return exchangeFactoryManager;
+    }
+
+    @Override
+    public void setExchangeFactoryManager(ExchangeFactoryManager exchangeFactoryManager) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public void setExchangeFactory(ExchangeFactory exchangeFactory) {
         throw new UnsupportedOperationException();
     }
diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
index dbc5188..96fbcd8 100644
--- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
+++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
@@ -65,6 +65,19 @@ public final class CamelOpenMBeanTypes {
                 new OpenType[] { SimpleType.STRING, SimpleType.BOOLEAN, SimpleType.BOOLEAN });
     }
 
+    public static TabularType listExchangeFactoryTabularType() throws OpenDataException {
+        CompositeType ct = listExchangeFactoryCompositeType();
+        return new TabularType("listExchangeFactory", "Lists all the exchange factories", ct, new String[] { "url" });
+    }
+
+    public static CompositeType listExchangeFactoryCompositeType() throws OpenDataException {
+        return new CompositeType(
+                "factories", "Factories",
+                new String[] { "url", "capacity", "pooled", "created", "released" },
+                new String[] { "Url", "Capacity", "Pooled", "Created", "Released" },
+                new OpenType[] { SimpleType.STRING, SimpleType.INTEGER, SimpleType.INTEGER, SimpleType.LONG, SimpleType.LONG });
+    }
+
     public static TabularType listRuntimeEndpointsTabularType() throws OpenDataException {
         CompositeType ct = listRuntimeEndpointsCompositeType();
         return new TabularType(
diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java
new file mode 100644
index 0000000..bedebc9
--- /dev/null
+++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import javax.management.openmbean.TabularData;
+
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
+
+public interface ManagedExchangeFactoryManagerMBean extends ManagedServiceMBean {
+
+    @ManagedAttribute(description = "Max capacity per consumer for exchange pooling")
+    Integer getCapacity();
+
+    @ManagedAttribute(description = "Whether statistics is enabled")
+    Boolean getStatisticsEnabled();
+
+    @ManagedAttribute(description = "Whether statistics is enabled")
+    void setStatisticsEnabled(Boolean statisticsEnabled);
+
+    @ManagedOperation(description = "Reset statistics")
+    void resetStatistics();
+
+    @ManagedOperation(description = "Purges the pool")
+    void purge();
+
+    @ManagedOperation(description = "Lists all the consumers and their pooling statistics")
+    TabularData listPools();
+
+}
diff --git a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java
index aae3e70..943584f 100644
--- a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java
+++ b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java
@@ -60,6 +60,7 @@ import org.apache.camel.management.mbean.ManagedCamelContext;
 import org.apache.camel.management.mbean.ManagedConsumerCache;
 import org.apache.camel.management.mbean.ManagedEndpoint;
 import org.apache.camel.management.mbean.ManagedEndpointRegistry;
+import org.apache.camel.management.mbean.ManagedExchangeFactoryManager;
 import org.apache.camel.management.mbean.ManagedInflightRepository;
 import org.apache.camel.management.mbean.ManagedProducerCache;
 import org.apache.camel.management.mbean.ManagedRestRegistry;
@@ -86,6 +87,7 @@ import org.apache.camel.spi.ConsumerCache;
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.EndpointRegistry;
 import org.apache.camel.spi.EventNotifier;
+import org.apache.camel.spi.ExchangeFactoryManager;
 import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.InternalProcessor;
 import org.apache.camel.spi.LifecycleStrategy;
@@ -537,6 +539,8 @@ public class JmxManagementLifecycleStrategy extends ServiceSupport implements Li
             answer = new ManagedConsumerCache(context, (ConsumerCache) service);
         } else if (service instanceof ProducerCache) {
             answer = new ManagedProducerCache(context, (ProducerCache) service);
+        } else if (service instanceof ExchangeFactoryManager) {
+            answer = new ManagedExchangeFactoryManager(context, (ExchangeFactoryManager) service);
         } else if (service instanceof EndpointRegistry) {
             answer = new ManagedEndpointRegistry(context, (EndpointRegistry) service);
         } else if (service instanceof BeanIntrospection) {
diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java
new file mode 100644
index 0000000..bab7a0d
--- /dev/null
+++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import java.util.Collection;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes;
+import org.apache.camel.api.management.mbean.ManagedExchangeFactoryManagerMBean;
+import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExchangeFactoryManager;
+import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.util.URISupport;
+
+@ManagedResource(description = "Managed ExchangeFactory")
+public class ManagedExchangeFactoryManager extends ManagedService implements ManagedExchangeFactoryManagerMBean {
+
+    private ExchangeFactoryManager exchangeFactoryManager;
+    private boolean sanitize;
+
+    public ManagedExchangeFactoryManager(CamelContext context, ExchangeFactoryManager exchangeFactoryManager) {
+        super(context, exchangeFactoryManager);
+        this.exchangeFactoryManager = exchangeFactoryManager;
+    }
+
+    @Override
+    public void init(ManagementStrategy strategy) {
+        super.init(strategy);
+        sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false;
+    }
+
+    @Override
+    public Integer getCapacity() {
+        return exchangeFactoryManager.getCapacity();
+    }
+
+    @Override
+    public Boolean getStatisticsEnabled() {
+        return exchangeFactoryManager.isStatisticsEnabled();
+    }
+
+    @Override
+    public void setStatisticsEnabled(Boolean statisticsEnabled) {
+        exchangeFactoryManager.setStatisticsEnabled(statisticsEnabled);
+    }
+
+    @Override
+    public void resetStatistics() {
+        exchangeFactoryManager.resetStatistics();
+    }
+
+    @Override
+    public void purge() {
+        exchangeFactoryManager.purge();
+    }
+
+    @Override
+    public TabularData listPools() {
+        try {
+            TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.listExchangeFactoryTabularType());
+            Collection<ExchangeFactory> factories = exchangeFactoryManager.getExchangeFactories();
+            for (ExchangeFactory ef : factories) {
+                CompositeType ct = CamelOpenMBeanTypes.listExchangeFactoryCompositeType();
+                String url = ef.getConsumer().getEndpoint().getEndpointUri();
+                if (sanitize) {
+                    url = URISupport.sanitizeUri(url);
+                }
+
+                int capacity = ef.getCapacity();
+                int size = ef.getSize();
+                long created = 0;
+                long released = 0;
+                if (ef.isStatisticsEnabled()) {
+                    created = ef.getStatistics().getCreatedCounter();
+                    released = ef.getStatistics().getReleasedCounter();
+                }
+
+                CompositeData data = new CompositeDataSupport(
+                        ct, new String[] { "url", "capacity", "pooled", "created", "released" },
+                        new Object[] { url, capacity, size, created, released });
+                answer.put(data);
+            }
+            return answer;
+        } catch (Exception e) {
+            throw RuntimeCamelException.wrapRuntimeCamelException(e);
+        }
+    }
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index f27f56a..a57ef22 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -26,7 +26,6 @@ import org.apache.camel.PooledExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.RouteAware;
-import org.apache.camel.Service;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.spi.RouteIdAware;
@@ -174,9 +173,7 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
     @Override
     protected void doBuild() throws Exception {
         super.doBuild();
-        if (exchangeFactory instanceof Service) {
-            ((Service) exchangeFactory).build();
-        }
+        exchangeFactory.build();
     }
 
     @Override


[camel] 02/02: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 95790be3d45867af4c64f42fb9ac518580e150a4
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Feb 24 16:08:21 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../apache/camel/spi/ExchangeFactoryManager.java   |  12 +-
 .../camel/impl/engine/AbstractCamelContext.java    |   2 +-
 .../impl/engine/DefaultExchangeFactoryManager.java |  84 ++++++++++++-
 .../api/management/mbean/CamelOpenMBeanTypes.java  |   8 +-
 .../mbean/ManagedExchangeFactoryManagerMBean.java  |  22 +++-
 .../mbean/ManagedExchangeFactoryManager.java       |  40 ++++++-
 .../management/ManagedNonManagedServiceTest.java   |   2 +-
 .../management/ManagedPooledExchangeTest.java      | 132 +++++++++++++++++++++
 ...edProducerRouteAddRemoveRegisterAlwaysTest.java |   2 +-
 .../management/ManagedRouteAddRemoveTest.java      |   2 +-
 .../src/test/resources/log4j2.properties           |   2 +-
 11 files changed, 292 insertions(+), 16 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java
index a46884a..1145129 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java
@@ -48,7 +48,7 @@ public interface ExchangeFactoryManager extends StaticService {
     /**
      * Number of consumers currently being managed
      */
-    int getSize();
+    int getConsumerCounter();
 
     /**
      * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100.
@@ -56,6 +56,11 @@ public interface ExchangeFactoryManager extends StaticService {
     int getCapacity();
 
     /**
+     * Number of currently exchanges being pooled (if pooled is in use)
+     */
+    int getPooledCounter();
+
+    /**
      * Whether statistics is enabled.
      */
     boolean isStatisticsEnabled();
@@ -75,4 +80,9 @@ public interface ExchangeFactoryManager extends StaticService {
      */
     void purge();
 
+    /**
+     * Aggregated statistics for all the managed exchange factories
+     */
+    ExchangeFactory.Statistics getStatistics();
+
 }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 305bfa9..c706c88 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -266,7 +266,7 @@ public abstract class AbstractCamelContext extends BaseService
     private volatile String version;
     private volatile PropertiesComponent propertiesComponent;
     private volatile CamelContextNameStrategy nameStrategy;
-    private volatile ExchangeFactoryManager exchangeFactoryManager = new DefaultExchangeFactoryManager();
+    private volatile ExchangeFactoryManager exchangeFactoryManager;
     private volatile ExchangeFactory exchangeFactory;
     private volatile ReactiveExecutor reactiveExecutor;
     private volatile ManagementNameStrategy managementNameStrategy;
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java
index 7b5ffc1..5d1b0c1 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java
@@ -30,8 +30,9 @@ import org.apache.camel.support.service.ServiceSupport;
 
 public class DefaultExchangeFactoryManager extends ServiceSupport implements ExchangeFactoryManager, CamelContextAware {
 
-    private CamelContext camelContext;
     private final Map<Consumer, ExchangeFactory> factories = new ConcurrentHashMap<>();
+    private final UtilizationStatistics statistics = new UtilizationStatistics();
+    private CamelContext camelContext;
     private int capacity;
     private boolean statisticsEnabled;
 
@@ -62,7 +63,7 @@ public class DefaultExchangeFactoryManager extends ServiceSupport implements Exc
     }
 
     @Override
-    public int getSize() {
+    public int getConsumerCounter() {
         return factories.size();
     }
 
@@ -72,6 +73,15 @@ public class DefaultExchangeFactoryManager extends ServiceSupport implements Exc
     }
 
     @Override
+    public int getPooledCounter() {
+        int counter = 0;
+        for (ExchangeFactory ef : factories.values()) {
+            counter += ef.getSize();
+        }
+        return counter;
+    }
+
+    @Override
     public boolean isStatisticsEnabled() {
         return statisticsEnabled;
     }
@@ -95,6 +105,76 @@ public class DefaultExchangeFactoryManager extends ServiceSupport implements Exc
     }
 
     @Override
+    public ExchangeFactory.Statistics getStatistics() {
+        return statistics;
+    }
+
+    /**
+     * Represents utilization statistics
+     */
+    final class UtilizationStatistics implements ExchangeFactory.Statistics {
+
+        @Override
+        public void reset() {
+            resetStatistics();
+        }
+
+        @Override
+        public long getCreatedCounter() {
+            long answer = 0;
+            if (statisticsEnabled) {
+                for (ExchangeFactory ef : factories.values()) {
+                    answer += ef.getStatistics().getCreatedCounter();
+                }
+            }
+            return answer;
+        }
+
+        @Override
+        public long getAcquiredCounter() {
+            long answer = 0;
+            if (statisticsEnabled) {
+                for (ExchangeFactory ef : factories.values()) {
+                    answer += ef.getStatistics().getAcquiredCounter();
+                }
+            }
+            return answer;
+        }
+
+        @Override
+        public long getReleasedCounter() {
+            long answer = 0;
+            if (statisticsEnabled) {
+                for (ExchangeFactory ef : factories.values()) {
+                    answer += ef.getStatistics().getReleasedCounter();
+                }
+            }
+            return answer;
+        }
+
+        @Override
+        public long getDiscardedCounter() {
+            long answer = 0;
+            if (statisticsEnabled) {
+                for (ExchangeFactory ef : factories.values()) {
+                    answer += ef.getStatistics().getDiscardedCounter();
+                }
+            }
+            return answer;
+        }
+
+        @Override
+        public boolean isStatisticsEnabled() {
+            return statisticsEnabled;
+        }
+
+        @Override
+        public void setStatisticsEnabled(boolean statisticsEnabled) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    @Override
     protected void doShutdown() throws Exception {
         factories.clear();
     }
diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
index 96fbcd8..e332344 100644
--- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
+++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
@@ -73,9 +73,11 @@ public final class CamelOpenMBeanTypes {
     public static CompositeType listExchangeFactoryCompositeType() throws OpenDataException {
         return new CompositeType(
                 "factories", "Factories",
-                new String[] { "url", "capacity", "pooled", "created", "released" },
-                new String[] { "Url", "Capacity", "Pooled", "Created", "Released" },
-                new OpenType[] { SimpleType.STRING, SimpleType.INTEGER, SimpleType.INTEGER, SimpleType.LONG, SimpleType.LONG });
+                new String[] { "url", "capacity", "pooled", "created", "acquired", "released", "discarded" },
+                new String[] { "Url", "Capacity", "Pooled", "Created", "Acquired", "Released", "Discarded" },
+                new OpenType[] {
+                        SimpleType.STRING, SimpleType.INTEGER, SimpleType.INTEGER, SimpleType.LONG, SimpleType.LONG,
+                        SimpleType.LONG, SimpleType.LONG });
     }
 
     public static TabularType listRuntimeEndpointsTabularType() throws OpenDataException {
diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java
index bedebc9..f2d57fd 100644
--- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java
+++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java
@@ -23,6 +23,9 @@ import org.apache.camel.api.management.ManagedOperation;
 
 public interface ManagedExchangeFactoryManagerMBean extends ManagedServiceMBean {
 
+    @ManagedAttribute(description = "Number of consumers managed")
+    Integer getConsumerCounter();
+
     @ManagedAttribute(description = "Max capacity per consumer for exchange pooling")
     Integer getCapacity();
 
@@ -38,7 +41,22 @@ public interface ManagedExchangeFactoryManagerMBean extends ManagedServiceMBean
     @ManagedOperation(description = "Purges the pool")
     void purge();
 
-    @ManagedOperation(description = "Lists all the consumers and their pooling statistics")
-    TabularData listPools();
+    @ManagedAttribute(description = "Total number of currently pooled exchanges (if pooling is in use)")
+    Integer getTotalPooled();
+
+    @ManagedAttribute(description = "Total number of new exchanges created")
+    Long getTotalCreated();
+
+    @ManagedAttribute(description = "Total number of exchanges reused (if pooling is in use)")
+    Long getTotalAcquired();
+
+    @ManagedAttribute(description = "Total number of exchanges released back to the pool")
+    Long getTotalReleased();
+
+    @ManagedAttribute(description = "Total number of exchanges discarded (such as when capacity is full)")
+    Long getTotalDiscarded();
+
+    @ManagedOperation(description = "Lists all the statistics in tabular form")
+    TabularData listStatistics();
 
 }
diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java
index bab7a0d..dea0a7c 100644
--- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java
+++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java
@@ -52,6 +52,16 @@ public class ManagedExchangeFactoryManager extends ManagedService implements Man
     }
 
     @Override
+    public Integer getConsumerCounter() {
+        return exchangeFactoryManager.getConsumerCounter();
+    }
+
+    @Override
+    public Integer getTotalPooled() {
+        return exchangeFactoryManager.getPooledCounter();
+    }
+
+    @Override
     public Integer getCapacity() {
         return exchangeFactoryManager.getCapacity();
     }
@@ -77,7 +87,27 @@ public class ManagedExchangeFactoryManager extends ManagedService implements Man
     }
 
     @Override
-    public TabularData listPools() {
+    public Long getTotalCreated() {
+        return exchangeFactoryManager.getStatistics().getCreatedCounter();
+    }
+
+    @Override
+    public Long getTotalAcquired() {
+        return exchangeFactoryManager.getStatistics().getAcquiredCounter();
+    }
+
+    @Override
+    public Long getTotalReleased() {
+        return exchangeFactoryManager.getStatistics().getReleasedCounter();
+    }
+
+    @Override
+    public Long getTotalDiscarded() {
+        return exchangeFactoryManager.getStatistics().getDiscardedCounter();
+    }
+
+    @Override
+    public TabularData listStatistics() {
         try {
             TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.listExchangeFactoryTabularType());
             Collection<ExchangeFactory> factories = exchangeFactoryManager.getExchangeFactories();
@@ -91,15 +121,19 @@ public class ManagedExchangeFactoryManager extends ManagedService implements Man
                 int capacity = ef.getCapacity();
                 int size = ef.getSize();
                 long created = 0;
+                long acquired = 0;
                 long released = 0;
+                long discarded = 0;
                 if (ef.isStatisticsEnabled()) {
                     created = ef.getStatistics().getCreatedCounter();
+                    acquired = ef.getStatistics().getAcquiredCounter();
                     released = ef.getStatistics().getReleasedCounter();
+                    discarded = ef.getStatistics().getDiscardedCounter();
                 }
 
                 CompositeData data = new CompositeDataSupport(
-                        ct, new String[] { "url", "capacity", "pooled", "created", "released" },
-                        new Object[] { url, capacity, size, created, released });
+                        ct, new String[] { "url", "capacity", "pooled", "created", "acquired", "released", "discarded" },
+                        new Object[] { url, capacity, size, created, acquired, released, discarded });
                 answer.put(data);
             }
             return answer;
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java
index 0779481..87948df 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java
@@ -31,7 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class ManagedNonManagedServiceTest extends ManagementTestSupport {
 
-    private static final int SERVICES = 11;
+    private static final int SERVICES = 12;
 
     @Test
     public void testService() throws Exception {
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedPooledExchangeTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedPooledExchangeTest.java
new file mode 100644
index 0000000..163c7bc
--- /dev/null
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedPooledExchangeTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.Processor;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.engine.PooledExchangeFactory;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+public class ManagedPooledExchangeTest extends ManagementTestSupport {
+
+    private final AtomicInteger counter = new AtomicInteger();
+    private final AtomicReference<Exchange> ref = new AtomicReference<>();
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+
+        PooledExchangeFactory pef = new PooledExchangeFactory();
+        pef.setStatisticsEnabled(true);
+        pef.setCapacity(123);
+        context.adapt(ExtendedCamelContext.class).setExchangeFactory(pef);
+
+        return context;
+    }
+
+    @Test
+    public void testSameExchange() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(3);
+        mock.expectedPropertyValuesReceivedInAnyOrder("myprop", 1, 3, 5);
+        mock.expectedHeaderValuesReceivedInAnyOrder("myheader", 2, 4, 6);
+        mock.message(0).header("first").isEqualTo(true);
+        mock.message(1).header("first").isNull();
+        mock.message(2).header("first").isNull();
+
+        context.getRouteController().startAllRoutes();
+
+        assertMockEndpointsSatisfied();
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+
+        // get the object name for the delayer
+        ObjectName on
+                = ObjectName.getInstance("org.apache.camel:context=camel-1,type=services,name=DefaultExchangeFactoryManager");
+
+        String state = (String) mbeanServer.getAttribute(on, "State");
+        assertEquals(ServiceStatus.Started.name(), state);
+
+        Integer con = (Integer) mbeanServer.getAttribute(on, "ConsumerCounter");
+        assertEquals(1, con.intValue());
+
+        Integer cap = (Integer) mbeanServer.getAttribute(on, "Capacity");
+        assertEquals(123, cap.intValue());
+
+        // also only 1 exchange pooled
+        con = (Integer) mbeanServer.getAttribute(on, "TotalPooled");
+        assertEquals(1, con.intValue());
+
+        Long num = (Long) mbeanServer.getAttribute(on, "TotalCreated");
+        assertEquals(1, num.intValue());
+
+        num = (Long) mbeanServer.getAttribute(on, "TotalAcquired");
+        assertEquals(2, num.intValue());
+
+        num = (Long) mbeanServer.getAttribute(on, "TotalReleased");
+        assertEquals(3, num.intValue());
+
+        num = (Long) mbeanServer.getAttribute(on, "TotalDiscarded");
+        assertEquals(0, num.intValue());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("timer:foo?period=1&delay=1&repeatCount=3").noAutoStartup()
+                        .setProperty("myprop", counter::incrementAndGet)
+                        .setHeader("myheader", counter::incrementAndGet)
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                // should be same exchange instance as its pooled
+                                Exchange old = ref.get();
+                                if (old == null) {
+                                    ref.set(exchange);
+                                    exchange.getMessage().setHeader("first", true);
+                                } else {
+                                    assertSame(old, exchange);
+                                }
+                            }
+                        })
+                        .to("mock:result");
+            }
+        };
+    }
+}
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
index c88836f..faa9bc7 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
@@ -31,7 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends ManagementTestSupport {
 
-    private static final int SERVICES = 11;
+    private static final int SERVICES = 12;
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
index bfef164..b589ecd 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
@@ -36,7 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
-    private static final int SERVICES = 11;
+    private static final int SERVICES = 12;
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
diff --git a/core/camel-management/src/test/resources/log4j2.properties b/core/camel-management/src/test/resources/log4j2.properties
index 3f1dcba..2e68815 100644
--- a/core/camel-management/src/test/resources/log4j2.properties
+++ b/core/camel-management/src/test/resources/log4j2.properties
@@ -46,4 +46,4 @@ rootLogger.appenderRef.file.ref = file
 #rootLogger.appenderRef.console.ref = console
 
 #logger.camel-core.name = org.apache.camel
-#logger.camel-core.level = TRACE
+#logger.camel-core.level = DEBUG