You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/20 16:18:56 UTC

[camel] branch exchange-factory updated (862be97 -> 7702b7c)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 862be97  CAMEL-16222: PooledExchangeFactory experiment
     new 4aff4a4  CAMEL-16222: PooledExchangeFactory experiment
     new 97b9aaf  CAMEL-16222: PooledExchangeFactory experiment
     new 4e1a7d3  CAMEL-16222: PooledExchangeFactory experiment
     new f6d0603  CAMEL-16222: PooledExchangeFactory experiment
     new 7702b7c  CAMEL-16222: PooledExchangeFactory experiment

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/camel/ExtendedExchange.java    |  36 -
 .../main/java/org/apache/camel/PooledExchange.java |  65 ++
 .../java/org/apache/camel/spi/ExchangeFactory.java |  15 +-
 .../camel/impl/engine/DefaultExchangeFactory.java  |  18 +-
 .../camel/impl/engine/DefaultUnitOfWork.java       |  20 +-
 .../camel/impl/engine/PooledExchangeFactory.java   |  63 +-
 .../MainConfigurationPropertiesConfigurer.java     |  12 +
 .../camel-main-configuration-metadata.json         |   2 +
 core/camel-main/src/main/docs/main.adoc            |   2 +
 .../camel/main/DefaultConfigurationConfigurer.java |   6 +
 .../camel/main/DefaultConfigurationProperties.java |  31 +
 ...{DefaultExchange.java => AbstractExchange.java} | 194 ++----
 .../org/apache/camel/support/DefaultConsumer.java  |   3 +-
 .../org/apache/camel/support/DefaultExchange.java  | 763 +--------------------
 .../camel/support/DefaultPooledExchange.java       | 170 +++++
 .../camel/support/PollingConsumerSupport.java      |   5 +-
 16 files changed, 418 insertions(+), 987 deletions(-)
 create mode 100644 core/camel-api/src/main/java/org/apache/camel/PooledExchange.java
 copy core/camel-support/src/main/java/org/apache/camel/support/{DefaultExchange.java => AbstractExchange.java} (80%)
 create mode 100644 core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java


[camel] 03/05: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 4e1a7d3a2c47ba9ed10627ddf097be19c4d4d31a
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Feb 20 16:30:27 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index 34ef0cf..9bc469f 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -35,10 +35,6 @@ import org.apache.camel.Exchange;
  */
 public interface ExchangeFactory {
 
-    // TODO: release from extended exchange without onCompletion (overhead)
-    // TODO: reuse unit of work (expensive to create)
-    // TODO: release via DoneUoW in less expensive way
-
     /**
      * Service factory key.
      */


[camel] 05/05: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 7702b7c30b9f610b05c24c6d1111f8c3f61abc6d
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Feb 20 17:18:14 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../MainConfigurationPropertiesConfigurer.java     | 12 +++++++++
 .../camel-main-configuration-metadata.json         |  2 ++
 core/camel-main/src/main/docs/main.adoc            |  2 ++
 .../camel/main/DefaultConfigurationConfigurer.java |  6 +++++
 .../camel/main/DefaultConfigurationProperties.java | 31 ++++++++++++++++++++++
 5 files changed, 53 insertions(+)

diff --git a/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java b/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java
index 797ed98..2cdf094 100644
--- a/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java
+++ b/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java
@@ -67,6 +67,10 @@ public class MainConfigurationPropertiesConfigurer extends org.apache.camel.supp
         case "EndpointLazyStartProducer": target.setEndpointLazyStartProducer(property(camelContext, boolean.class, value)); return true;
         case "endpointruntimestatisticsenabled":
         case "EndpointRuntimeStatisticsEnabled": target.setEndpointRuntimeStatisticsEnabled(property(camelContext, boolean.class, value)); return true;
+        case "exchangefactory":
+        case "ExchangeFactory": target.setExchangeFactory(property(camelContext, java.lang.String.class, value)); return true;
+        case "exchangefactorystatisticsenabled":
+        case "ExchangeFactoryStatisticsEnabled": target.setExchangeFactoryStatisticsEnabled(property(camelContext, boolean.class, value)); return true;
         case "fileconfigurations":
         case "FileConfigurations": target.setFileConfigurations(property(camelContext, java.lang.String.class, value)); return true;
         case "inflightrepositorybrowseenabled":
@@ -248,6 +252,10 @@ public class MainConfigurationPropertiesConfigurer extends org.apache.camel.supp
         case "EndpointLazyStartProducer": return boolean.class;
         case "endpointruntimestatisticsenabled":
         case "EndpointRuntimeStatisticsEnabled": return boolean.class;
+        case "exchangefactory":
+        case "ExchangeFactory": return java.lang.String.class;
+        case "exchangefactorystatisticsenabled":
+        case "ExchangeFactoryStatisticsEnabled": return boolean.class;
         case "fileconfigurations":
         case "FileConfigurations": return java.lang.String.class;
         case "inflightrepositorybrowseenabled":
@@ -430,6 +438,10 @@ public class MainConfigurationPropertiesConfigurer extends org.apache.camel.supp
         case "EndpointLazyStartProducer": return target.isEndpointLazyStartProducer();
         case "endpointruntimestatisticsenabled":
         case "EndpointRuntimeStatisticsEnabled": return target.isEndpointRuntimeStatisticsEnabled();
+        case "exchangefactory":
+        case "ExchangeFactory": return target.getExchangeFactory();
+        case "exchangefactorystatisticsenabled":
+        case "ExchangeFactoryStatisticsEnabled": return target.isExchangeFactoryStatisticsEnabled();
         case "fileconfigurations":
         case "FileConfigurations": return target.getFileConfigurations();
         case "inflightrepositorybrowseenabled":
diff --git a/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json b/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json
index d443467..076f94a 100644
--- a/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json
+++ b/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json
@@ -33,6 +33,8 @@
     { "name": "camel.main.endpointBridgeErrorHandler", "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN\/ERROR level and ignored. The default va [...]
     { "name": "camel.main.endpointLazyStartProducer", "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first mes [...]
     { "name": "camel.main.endpointRuntimeStatisticsEnabled", "description": "Sets whether endpoint runtime statistics is enabled (gathers runtime usage of each incoming and outgoing endpoints). The default value is false.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "boolean", "javaType": "boolean" },
+    { "name": "camel.main.exchangeFactory", "description": "Experimental: Controls whether to pool (reuse) exchanges or create new fresh exchanges (default). Using pooled will reduce JVM garbage collection overhead by avoiding to re-create Exchange instances per message each consumer receives.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "string", "javaType": "java.lang.String", "defaultValue": "default", "enum": [ "default", "pooled" ] },
+    { "name": "camel.main.exchangeFactoryStatisticsEnabled", "description": "Configures whether statistics is enabled on exchange factory.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "boolean", "javaType": "boolean" },
     { "name": "camel.main.fileConfigurations", "description": "Directory to load additional configuration files that contains configuration values that takes precedence over any other configuration. This can be used to refer to files that may have secret configuration that has been mounted on the file system for containers. You can specify a pattern to load from sub directories and a name pattern such as \/var\/app\/secret\/.properties, multiple directories can be separated by comma.", " [...]
     { "name": "camel.main.inflightRepositoryBrowseEnabled", "description": "Sets whether the inflight repository should allow browsing each inflight exchange. This is by default disabled as there is a very slight performance overhead when enabled.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "boolean", "javaType": "boolean" },
     { "name": "camel.main.javaRoutesExcludePattern", "description": "Used for exclusive filtering RouteBuilder classes which are collected from the registry or via classpath scanning. The exclusive filtering takes precedence over inclusive filtering. The pattern is using Ant-path style pattern. Multiple patterns can be specified separated by comma. For example to exclude all classes starting with Bar use: &#42;&#42;\/Bar&#42; To exclude all routes form a specific package use: com\/mycomp [...]
diff --git a/core/camel-main/src/main/docs/main.adoc b/core/camel-main/src/main/docs/main.adoc
index 435375a..9847bb0 100644
--- a/core/camel-main/src/main/docs/main.adoc
+++ b/core/camel-main/src/main/docs/main.adoc
@@ -45,6 +45,8 @@ The following table lists all the options:
 | *camel.main.endpointBridgeError{zwsp}Handler* | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN/ERROR level and ignored. The default value is false. |  | boolean
 | *camel.main.endpointLazyStart{zwsp}Producer* | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed the [...]
 | *camel.main.endpointRuntime{zwsp}StatisticsEnabled* | Sets whether endpoint runtime statistics is enabled (gathers runtime usage of each incoming and outgoing endpoints). The default value is false. |  | boolean
+| *camel.main.exchangeFactory* | Experimental: Controls whether to pool (reuse) exchanges or create new fresh exchanges (default). Using pooled will reduce JVM garbage collection overhead by avoiding to re-create Exchange instances per message each consumer receives. | default | String
+| *camel.main.exchangeFactory{zwsp}StatisticsEnabled* | Configures whether statistics is enabled on exchange factory. |  | boolean
 | *camel.main.fileConfigurations* | Directory to load additional configuration files that contains configuration values that takes precedence over any other configuration. This can be used to refer to files that may have secret configuration that has been mounted on the file system for containers. You can specify a pattern to load from sub directories and a name pattern such as /var/app/secret/.properties, multiple directories can be separated by comma. |  | String
 | *camel.main.inflightRepository{zwsp}BrowseEnabled* | Sets whether the inflight repository should allow browsing each inflight exchange. This is by default disabled as there is a very slight performance overhead when enabled. |  | boolean
 | *camel.main.javaRoutesExclude{zwsp}Pattern* | Used for exclusive filtering RouteBuilder classes which are collected from the registry or via classpath scanning. The exclusive filtering takes precedence over inclusive filtering. The pattern is using Ant-path style pattern. Multiple patterns can be specified separated by comma. For example to exclude all classes starting with Bar use: &#42;&#42;/Bar&#42; To exclude all routes form a specific package use: com/mycompany/bar/&#42; To exclud [...]
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
index 9c3be48..34016ca 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
@@ -31,6 +31,7 @@ import org.apache.camel.cluster.CamelClusterService;
 import org.apache.camel.health.HealthCheckRegistry;
 import org.apache.camel.health.HealthCheckRepository;
 import org.apache.camel.impl.debugger.BacklogTracer;
+import org.apache.camel.impl.engine.PooledExchangeFactory;
 import org.apache.camel.model.Model;
 import org.apache.camel.model.ModelCamelContext;
 import org.apache.camel.model.ModelLifecycleStrategy;
@@ -120,6 +121,11 @@ public final class DefaultConfigurationConfigurer {
         }
         ecc.getBeanIntrospection().afterPropertiesConfigured(camelContext);
 
+        if ("pooled".equals(config.getExchangeFactory())) {
+            ecc.setExchangeFactory(new PooledExchangeFactory());
+        }
+        ecc.getExchangeFactory().setStatisticsEnabled(config.isExchangeFactoryStatisticsEnabled());
+
         if (!config.isJmxEnabled()) {
             camelContext.disableJMX();
         }
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java
index c66cd89..ef780a2 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java
@@ -88,6 +88,9 @@ public abstract class DefaultConfigurationProperties<T> {
     private String routesIncludePattern = "classpath:camel/*.xml,classpath:camel-template/*.xml,classpath:camel-rest/*.xml";
     private String routesExcludePattern;
     private boolean lightweight;
+    @Metadata(defaultValue = "default", enums = "default,pooled")
+    private String exchangeFactory = "default";
+    private boolean exchangeFactoryStatisticsEnabled;
     // route controller
     @Metadata(defaultValue = "DEBUG")
     @Deprecated
@@ -927,6 +930,34 @@ public abstract class DefaultConfigurationProperties<T> {
         this.lightweight = lightweight;
     }
 
+    @Experimental
+    public String getExchangeFactory() {
+        return exchangeFactory;
+    }
+
+    /**
+     * Experimental: Controls whether to pool (reuse) exchanges or create new fresh exchanges (default). Using pooled
+     * will reduce JVM garbage collection overhead by avoiding to re-create Exchange instances per message each consumer
+     * receives.
+     */
+    @Experimental
+    public void setExchangeFactory(String exchangeFactory) {
+        this.exchangeFactory = exchangeFactory;
+    }
+
+    @Experimental
+    public boolean isExchangeFactoryStatisticsEnabled() {
+        return exchangeFactoryStatisticsEnabled;
+    }
+
+    /**
+     * Configures whether statistics is enabled on exchange factory.
+     */
+    @Experimental
+    public void setExchangeFactoryStatisticsEnabled(boolean exchangeFactoryStatisticsEnabled) {
+        this.exchangeFactoryStatisticsEnabled = exchangeFactoryStatisticsEnabled;
+    }
+
     @Deprecated
     public LoggingLevel getRouteControllerLoggingLevel() {
         return routeControllerLoggingLevel;


[camel] 02/05: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 97b9aaf618e491901b2414725e8fd96d307c2941
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Feb 20 15:22:02 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 ...{DefaultExchange.java => AbstractExchange.java} |  46 +-
 .../org/apache/camel/support/DefaultConsumer.java  |  11 +-
 .../org/apache/camel/support/DefaultExchange.java  | 683 +--------------------
 .../camel/support/DefaultPooledExchange.java       |  11 +-
 .../camel/support/PollingConsumerSupport.java      |   5 +-
 5 files changed, 53 insertions(+), 703 deletions(-)

diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
similarity index 96%
copy from core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
copy to core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
index adb804c..6e5daa6 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
@@ -40,16 +40,20 @@ import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.util.ObjectHelper;
 
 /**
- * The default and only implementation of {@link Exchange}.
+ * Base class for the two official and only implementations of {@link Exchange}, the {@link DefaultExchange} and
+ * {@link DefaultPooledExchange}.
+ *
+ * Camel end users should use {@link DefaultExchange} if creating an {@link Exchange} manually. However that is more
+ * seldom to use, as exchanges are created via {@link Endpoint}.
+ *
+ * @see DefaultExchange
  */
-public class DefaultExchange implements ExtendedExchange {
+class AbstractExchange implements ExtendedExchange {
 
-    // TODO: AbstractExchange and move somewhere, and have DefaultExchange as thin
-
-    private final CamelContext context;
-    long created;
+    final CamelContext context;
     // optimize to create properties always and with a reasonable small size
     final Map<String, Object> properties = new ConcurrentHashMap<>(8);
+    long created;
     Message in;
     Message out;
     Exception exception;
@@ -72,19 +76,19 @@ public class DefaultExchange implements ExtendedExchange {
     boolean redeliveryExhausted;
     Boolean errorHandlerHandled;
 
-    public DefaultExchange(CamelContext context) {
+    public AbstractExchange(CamelContext context) {
         this.context = context;
         this.pattern = ExchangePattern.InOnly;
         this.created = System.currentTimeMillis();
     }
 
-    public DefaultExchange(CamelContext context, ExchangePattern pattern) {
+    public AbstractExchange(CamelContext context, ExchangePattern pattern) {
         this.context = context;
         this.pattern = pattern;
         this.created = System.currentTimeMillis();
     }
 
-    public DefaultExchange(Exchange parent) {
+    public AbstractExchange(Exchange parent) {
         this.context = parent.getContext();
         this.pattern = parent.getPattern();
         this.created = parent.getCreated();
@@ -93,14 +97,14 @@ public class DefaultExchange implements ExtendedExchange {
         this.unitOfWork = parent.getUnitOfWork();
     }
 
-    public DefaultExchange(Endpoint fromEndpoint) {
+    public AbstractExchange(Endpoint fromEndpoint) {
         this.context = fromEndpoint.getCamelContext();
         this.pattern = ExchangePattern.InOnly;
         this.created = System.currentTimeMillis();
         this.fromEndpoint = fromEndpoint;
     }
 
-    public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) {
+    public AbstractExchange(Endpoint fromEndpoint, ExchangePattern pattern) {
         this.context = fromEndpoint.getCamelContext();
         this.pattern = pattern;
         this.created = System.currentTimeMillis();
@@ -108,16 +112,6 @@ public class DefaultExchange implements ExtendedExchange {
     }
 
     @Override
-    public String toString() {
-        // do not output information about the message as it may contain sensitive information
-        if (exchangeId != null) {
-            return "Exchange[" + exchangeId + "]";
-        } else {
-            return "Exchange[]";
-        }
-    }
-
-    @Override
     public long getCreated() {
         return created;
     }
@@ -717,4 +711,14 @@ public class DefaultExchange implements ExtendedExchange {
         return context.getUuidGenerator().generateUuid();
     }
 
+    @Override
+    public String toString() {
+        // do not output information about the message as it may contain sensitive information
+        if (exchangeId != null) {
+            return "Exchange[" + exchangeId + "]";
+        } else {
+            return "Exchange[]";
+        }
+    }
+
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index 685f8c1..2bd8cc2 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -16,7 +16,16 @@
  */
 package org.apache.camel.support;
 
-import org.apache.camel.*;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.PooledExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.RouteAware;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.spi.RouteIdAware;
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index adb804c..198484a 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -16,705 +16,34 @@
  */
 package org.apache.camel.support;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-
 import org.apache.camel.CamelContext;
-import org.apache.camel.CamelExecutionException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
-import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.ExtendedExchange;
-import org.apache.camel.Message;
-import org.apache.camel.MessageHistory;
-import org.apache.camel.spi.HeadersMapFactory;
-import org.apache.camel.spi.Synchronization;
-import org.apache.camel.spi.UnitOfWork;
-import org.apache.camel.util.ObjectHelper;
 
 /**
  * The default and only implementation of {@link Exchange}.
  */
-public class DefaultExchange implements ExtendedExchange {
-
-    // TODO: AbstractExchange and move somewhere, and have DefaultExchange as thin
-
-    private final CamelContext context;
-    long created;
-    // optimize to create properties always and with a reasonable small size
-    final Map<String, Object> properties = new ConcurrentHashMap<>(8);
-    Message in;
-    Message out;
-    Exception exception;
-    String exchangeId;
-    UnitOfWork unitOfWork;
-    ExchangePattern pattern;
-    Endpoint fromEndpoint;
-    String fromRouteId;
-    List<Synchronization> onCompletions;
-    Boolean externalRedelivered;
-    String historyNodeId;
-    String historyNodeLabel;
-    boolean transacted;
-    boolean routeStop;
-    boolean rollbackOnly;
-    boolean rollbackOnlyLast;
-    boolean notifyEvent;
-    boolean interrupted;
-    boolean interruptable = true;
-    boolean redeliveryExhausted;
-    Boolean errorHandlerHandled;
+public final class DefaultExchange extends AbstractExchange {
 
     public DefaultExchange(CamelContext context) {
-        this.context = context;
-        this.pattern = ExchangePattern.InOnly;
-        this.created = System.currentTimeMillis();
+        super(context);
     }
 
     public DefaultExchange(CamelContext context, ExchangePattern pattern) {
-        this.context = context;
-        this.pattern = pattern;
-        this.created = System.currentTimeMillis();
+        super(context, pattern);
     }
 
     public DefaultExchange(Exchange parent) {
-        this.context = parent.getContext();
-        this.pattern = parent.getPattern();
-        this.created = parent.getCreated();
-        this.fromEndpoint = parent.getFromEndpoint();
-        this.fromRouteId = parent.getFromRouteId();
-        this.unitOfWork = parent.getUnitOfWork();
+        super(parent);
     }
 
     public DefaultExchange(Endpoint fromEndpoint) {
-        this.context = fromEndpoint.getCamelContext();
-        this.pattern = ExchangePattern.InOnly;
-        this.created = System.currentTimeMillis();
-        this.fromEndpoint = fromEndpoint;
+        super(fromEndpoint);
     }
 
     public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) {
-        this.context = fromEndpoint.getCamelContext();
-        this.pattern = pattern;
-        this.created = System.currentTimeMillis();
-        this.fromEndpoint = fromEndpoint;
-    }
-
-    @Override
-    public String toString() {
-        // do not output information about the message as it may contain sensitive information
-        if (exchangeId != null) {
-            return "Exchange[" + exchangeId + "]";
-        } else {
-            return "Exchange[]";
-        }
-    }
-
-    @Override
-    public long getCreated() {
-        return created;
-    }
-
-    @Override
-    public Exchange copy() {
-        DefaultExchange exchange = new DefaultExchange(this);
-
-        exchange.setIn(getIn().copy());
-        exchange.getIn().setBody(getIn().getBody());
-        if (getIn().hasHeaders()) {
-            exchange.getIn().setHeaders(safeCopyHeaders(getIn().getHeaders()));
-        }
-        if (hasOut()) {
-            exchange.setOut(getOut().copy());
-            exchange.getOut().setBody(getOut().getBody());
-            if (getOut().hasHeaders()) {
-                exchange.getOut().setHeaders(safeCopyHeaders(getOut().getHeaders()));
-            }
-        }
-
-        exchange.setException(exception);
-        exchange.setRouteStop(routeStop);
-        exchange.setRollbackOnly(rollbackOnly);
-        exchange.setRollbackOnlyLast(rollbackOnlyLast);
-        exchange.setNotifyEvent(notifyEvent);
-        exchange.setRedeliveryExhausted(redeliveryExhausted);
-        exchange.setErrorHandlerHandled(errorHandlerHandled);
-
-        // copy properties after body as body may trigger lazy init
-        if (hasProperties()) {
-            safeCopyProperties(getProperties(), exchange.getProperties());
-        }
-
-        return exchange;
-    }
-
-    private Map<String, Object> safeCopyHeaders(Map<String, Object> headers) {
-        if (headers == null) {
-            return null;
-        }
-
-        if (context != null) {
-            ExtendedCamelContext ecc = (ExtendedCamelContext) context;
-            HeadersMapFactory factory = ecc.getHeadersMapFactory();
-            if (factory != null) {
-                return factory.newMap(headers);
-            }
-        }
-        // should not really happen but some tests dont start camel context
-        return new HashMap<>(headers);
-    }
-
-    @SuppressWarnings("unchecked")
-    private void safeCopyProperties(Map<String, Object> source, Map<String, Object> target) {
-        target.putAll(source);
-        if (getContext().isMessageHistory()) {
-            // safe copy message history using a defensive copy
-            List<MessageHistory> history = (List<MessageHistory>) target.remove(Exchange.MESSAGE_HISTORY);
-            if (history != null) {
-                // use thread-safe list as message history may be accessed concurrently
-                target.put(Exchange.MESSAGE_HISTORY, new CopyOnWriteArrayList<>(history));
-            }
-        }
-    }
-
-    @Override
-    public CamelContext getContext() {
-        return context;
-    }
-
-    @Override
-    public Object getProperty(String name) {
-        return properties.get(name);
-    }
-
-    @Override
-    public Object getProperty(String name, Object defaultValue) {
-        Object answer = getProperty(name);
-        return answer != null ? answer : defaultValue;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public <T> T getProperty(String name, Class<T> type) {
-        Object value = getProperty(name);
-        if (value == null) {
-            // lets avoid NullPointerException when converting to boolean for null values
-            if (boolean.class == type) {
-                return (T) Boolean.FALSE;
-            }
-            return null;
-        }
-
-        // eager same instance type test to avoid the overhead of invoking the type converter
-        // if already same type
-        if (type.isInstance(value)) {
-            return (T) value;
-        }
-
-        return ExchangeHelper.convertToType(this, type, value);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public <T> T getProperty(String name, Object defaultValue, Class<T> type) {
-        Object value = getProperty(name);
-        if (value == null) {
-            value = defaultValue;
-        }
-        if (value == null) {
-            // lets avoid NullPointerException when converting to boolean for null values
-            if (boolean.class == type) {
-                return (T) Boolean.FALSE;
-            }
-            return null;
-        }
-
-        // eager same instance type test to avoid the overhead of invoking the type converter
-        // if already same type
-        if (type.isInstance(value)) {
-            return (T) value;
-        }
-
-        return ExchangeHelper.convertToType(this, type, value);
-    }
-
-    @Override
-    public void setProperty(String name, Object value) {
-        if (value != null) {
-            // avoid the NullPointException
-            properties.put(name, value);
-        } else {
-            // if the value is null, we just remove the key from the map
-            if (name != null) {
-                properties.remove(name);
-            }
-        }
-    }
-
-    @Override
-    public void setProperties(Map<String, Object> properties) {
-        this.properties.clear();
-        this.properties.putAll(properties);
-    }
-
-    @Override
-    public Object removeProperty(String name) {
-        if (!hasProperties()) {
-            return null;
-        }
-        return properties.remove(name);
-    }
-
-    @Override
-    public boolean removeProperties(String pattern) {
-        return removeProperties(pattern, (String[]) null);
-    }
-
-    @Override
-    public boolean removeProperties(String pattern, String... excludePatterns) {
-        if (!hasProperties()) {
-            return false;
-        }
-
-        // special optimized
-        if (excludePatterns == null && "*".equals(pattern)) {
-            properties.clear();
-            return true;
-        }
-
-        // store keys to be removed as we cannot loop and remove at the same time in implementations such as HashMap
-        Set<String> toBeRemoved = null;
-        boolean matches = false;
-        for (String key : properties.keySet()) {
-            if (PatternHelper.matchPattern(key, pattern)) {
-                if (excludePatterns != null && PatternHelper.isExcludePatternMatch(key, excludePatterns)) {
-                    continue;
-                }
-                matches = true;
-                if (toBeRemoved == null) {
-                    toBeRemoved = new HashSet<>();
-                }
-                toBeRemoved.add(key);
-            }
-        }
-
-        if (matches) {
-            if (toBeRemoved.size() == properties.size()) {
-                // special optimization when all should be removed
-                properties.clear();
-            } else {
-                for (String key : toBeRemoved) {
-                    properties.remove(key);
-                }
-            }
-        }
-
-        return matches;
-    }
-
-    @Override
-    public Map<String, Object> getProperties() {
-        return properties;
-    }
-
-    @Override
-    public boolean hasProperties() {
-        return !properties.isEmpty();
-    }
-
-    @Override
-    public Message getIn() {
-        if (in == null) {
-            in = new DefaultMessage(getContext());
-            configureMessage(in);
-        }
-        return in;
-    }
-
-    @Override
-    public <T> T getIn(Class<T> type) {
-        Message in = getIn();
-
-        // eager same instance type test to avoid the overhead of invoking the type converter
-        // if already same type
-        if (type.isInstance(in)) {
-            return type.cast(in);
-        }
-
-        // fallback to use type converter
-        return context.getTypeConverter().convertTo(type, this, in);
-    }
-
-    @Override
-    public void setIn(Message in) {
-        this.in = in;
-        configureMessage(in);
-    }
-
-    @Override
-    public Message getOut() {
-        // lazy create
-        if (out == null) {
-            out = (in instanceof MessageSupport)
-                    ? ((MessageSupport) in).newInstance() : new DefaultMessage(getContext());
-            configureMessage(out);
-        }
-        return out;
-    }
-
-    @Override
-    public <T> T getOut(Class<T> type) {
-        if (!hasOut()) {
-            return null;
-        }
-
-        Message out = getOut();
-
-        // eager same instance type test to avoid the overhead of invoking the type converter
-        // if already same type
-        if (type.isInstance(out)) {
-            return type.cast(out);
-        }
-
-        // fallback to use type converter
-        return context.getTypeConverter().convertTo(type, this, out);
-    }
-
-    @Override
-    public boolean hasOut() {
-        return out != null;
-    }
-
-    @Override
-    public void setOut(Message out) {
-        this.out = out;
-        configureMessage(out);
-    }
-
-    @Override
-    public Message getMessage() {
-        return hasOut() ? getOut() : getIn();
-    }
-
-    @Override
-    public <T> T getMessage(Class<T> type) {
-        return hasOut() ? getOut(type) : getIn(type);
-    }
-
-    @Override
-    public void setMessage(Message message) {
-        if (hasOut()) {
-            setOut(message);
-        } else {
-            setIn(message);
-        }
-    }
-
-    @Override
-    public Exception getException() {
-        return exception;
-    }
-
-    @Override
-    public <T> T getException(Class<T> type) {
-        return ObjectHelper.getException(type, exception);
-    }
-
-    @Override
-    public void setException(Throwable t) {
-        if (t == null) {
-            this.exception = null;
-        } else if (t instanceof Exception) {
-            this.exception = (Exception) t;
-        } else {
-            // wrap throwable into an exception
-            this.exception = CamelExecutionException.wrapCamelExecutionException(this, t);
-        }
-        if (t instanceof InterruptedException) {
-            // mark the exchange as interrupted due to the interrupt exception
-            setInterrupted(true);
-        }
-    }
-
-    @Override
-    public <T extends Exchange> T adapt(Class<T> type) {
-        return type.cast(this);
-    }
-
-    @Override
-    public ExchangePattern getPattern() {
-        return pattern;
-    }
-
-    @Override
-    public void setPattern(ExchangePattern pattern) {
-        this.pattern = pattern;
-    }
-
-    @Override
-    public Endpoint getFromEndpoint() {
-        return fromEndpoint;
-    }
-
-    @Override
-    public void setFromEndpoint(Endpoint fromEndpoint) {
-        this.fromEndpoint = fromEndpoint;
-    }
-
-    @Override
-    public String getFromRouteId() {
-        return fromRouteId;
-    }
-
-    @Override
-    public void setFromRouteId(String fromRouteId) {
-        this.fromRouteId = fromRouteId;
-    }
-
-    @Override
-    public String getExchangeId() {
-        if (exchangeId == null) {
-            exchangeId = createExchangeId();
-        }
-        return exchangeId;
-    }
-
-    @Override
-    public void setExchangeId(String id) {
-        this.exchangeId = id;
-    }
-
-    @Override
-    public boolean isFailed() {
-        return exception != null;
-    }
-
-    @Override
-    public boolean isTransacted() {
-        return transacted;
-    }
-
-    @Override
-    public void setTransacted(boolean transacted) {
-        this.transacted = true;
-    }
-
-    @Override
-    public boolean isRouteStop() {
-        return routeStop;
-    }
-
-    @Override
-    public void setRouteStop(boolean routeStop) {
-        this.routeStop = routeStop;
-    }
-
-    @Override
-    public boolean isExternalRedelivered() {
-        if (externalRedelivered == null) {
-            // lets avoid adding methods to the Message API, so we use the
-            // DefaultMessage to allow component specific messages to extend
-            // and implement the isExternalRedelivered method.
-            Message msg = getIn();
-            if (msg instanceof DefaultMessage) {
-                externalRedelivered = ((DefaultMessage) msg).isTransactedRedelivered();
-            }
-            // not from a transactional resource so mark it as false by default
-            if (externalRedelivered == null) {
-                externalRedelivered = false;
-            }
-        }
-        return externalRedelivered;
-    }
-
-    @Override
-    public boolean isRollbackOnly() {
-        return rollbackOnly;
-    }
-
-    @Override
-    public void setRollbackOnly(boolean rollbackOnly) {
-        this.rollbackOnly = rollbackOnly;
-    }
-
-    @Override
-    public boolean isRollbackOnlyLast() {
-        return rollbackOnlyLast;
-    }
-
-    @Override
-    public void setRollbackOnlyLast(boolean rollbackOnlyLast) {
-        this.rollbackOnlyLast = rollbackOnlyLast;
-    }
-
-    @Override
-    public UnitOfWork getUnitOfWork() {
-        return unitOfWork;
-    }
-
-    @Override
-    public void setUnitOfWork(UnitOfWork unitOfWork) {
-        this.unitOfWork = unitOfWork;
-        if (unitOfWork != null && onCompletions != null) {
-            // now an unit of work has been assigned so add the on completions
-            // we might have registered already
-            for (Synchronization onCompletion : onCompletions) {
-                unitOfWork.addSynchronization(onCompletion);
-            }
-            // cleanup the temporary on completion list as they now have been registered
-            // on the unit of work
-            onCompletions.clear();
-            onCompletions = null;
-        }
-    }
-
-    @Override
-    public void addOnCompletion(Synchronization onCompletion) {
-        if (unitOfWork == null) {
-            // unit of work not yet registered so we store the on completion temporary
-            // until the unit of work is assigned to this exchange by the unit of work
-            if (onCompletions == null) {
-                onCompletions = new ArrayList<>();
-            }
-            onCompletions.add(onCompletion);
-        } else {
-            getUnitOfWork().addSynchronization(onCompletion);
-        }
-    }
-
-    @Override
-    public boolean containsOnCompletion(Synchronization onCompletion) {
-        if (unitOfWork != null) {
-            // if there is an unit of work then the completions is moved there
-            return unitOfWork.containsSynchronization(onCompletion);
-        } else {
-            // check temporary completions if no unit of work yet
-            return onCompletions != null && onCompletions.contains(onCompletion);
-        }
-    }
-
-    @Override
-    public void handoverCompletions(Exchange target) {
-        if (onCompletions != null) {
-            for (Synchronization onCompletion : onCompletions) {
-                target.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
-            }
-            // cleanup the temporary on completion list as they have been handed over
-            onCompletions.clear();
-            onCompletions = null;
-        } else if (unitOfWork != null) {
-            // let unit of work handover
-            unitOfWork.handoverSynchronization(target);
-        }
-    }
-
-    @Override
-    public List<Synchronization> handoverCompletions() {
-        List<Synchronization> answer = null;
-        if (onCompletions != null) {
-            answer = new ArrayList<>(onCompletions);
-            onCompletions.clear();
-            onCompletions = null;
-        }
-        return answer;
-    }
-
-    @Override
-    public String getHistoryNodeId() {
-        return historyNodeId;
-    }
-
-    @Override
-    public void setHistoryNodeId(String historyNodeId) {
-        this.historyNodeId = historyNodeId;
-    }
-
-    @Override
-    public String getHistoryNodeLabel() {
-        return historyNodeLabel;
-    }
-
-    @Override
-    public void setHistoryNodeLabel(String historyNodeLabel) {
-        this.historyNodeLabel = historyNodeLabel;
-    }
-
-    @Override
-    public boolean isNotifyEvent() {
-        return notifyEvent;
-    }
-
-    @Override
-    public void setNotifyEvent(boolean notifyEvent) {
-        this.notifyEvent = notifyEvent;
-    }
-
-    @Override
-    public boolean isInterrupted() {
-        return interrupted;
-    }
-
-    @Override
-    public void setInterrupted(boolean interrupted) {
-        if (interruptable) {
-            this.interrupted = interrupted;
-        }
-    }
-
-    @Override
-    public void setInterruptable(boolean interruptable) {
-        this.interruptable = interruptable;
-    }
-
-    @Override
-    public boolean isRedeliveryExhausted() {
-        return redeliveryExhausted;
-    }
-
-    @Override
-    public void setRedeliveryExhausted(boolean redeliveryExhausted) {
-        this.redeliveryExhausted = redeliveryExhausted;
-    }
-
-    public Boolean getErrorHandlerHandled() {
-        return errorHandlerHandled;
-    }
-
-    @Override
-    public boolean isErrorHandlerHandledSet() {
-        return errorHandlerHandled != null;
-    }
-
-    @Override
-    public boolean isErrorHandlerHandled() {
-        return errorHandlerHandled;
-    }
-
-    @Override
-    public void setErrorHandlerHandled(Boolean errorHandlerHandled) {
-        this.errorHandlerHandled = errorHandlerHandled;
-    }
-
-    /**
-     * Configures the message after it has been set on the exchange
-     */
-    protected void configureMessage(Message message) {
-        if (message instanceof MessageSupport) {
-            MessageSupport messageSupport = (MessageSupport) message;
-            messageSupport.setExchange(this);
-            messageSupport.setCamelContext(getContext());
-        }
-    }
-
-    protected String createExchangeId() {
-        return context.getUuidGenerator().generateUuid();
+        super(fromEndpoint, pattern);
     }
 
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
index 17ec269..fc4492f 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
@@ -18,12 +18,17 @@ package org.apache.camel.support;
 
 import java.util.function.Function;
 
-import org.apache.camel.*;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.PooledExchange;
 
 /**
- * The default {@link PooledExchange}.
+ * The default and only implementation of {@link PooledExchange}.
  */
-public final class DefaultPooledExchange extends DefaultExchange implements PooledExchange {
+public final class DefaultPooledExchange extends AbstractExchange implements PooledExchange {
 
     private Function<Exchange, Boolean> onDone;
     private Class originalInClassType;
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java b/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
index a0ae0de..6749f6a 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
@@ -16,7 +16,10 @@
  */
 package org.apache.camel.support;
 
-import org.apache.camel.*;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumer;
+import org.apache.camel.Processor;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.support.service.ServiceSupport;
 


[camel] 01/05: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 4aff4a43aff21f3da611a6b336f44b3c8be80460
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Feb 20 15:08:37 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../java/org/apache/camel/ExtendedExchange.java    |  36 -----
 .../main/java/org/apache/camel/PooledExchange.java |  65 ++++++++
 .../camel/impl/engine/DefaultUnitOfWork.java       |  20 +--
 .../camel/impl/engine/PooledExchangeFactory.java   |  22 +--
 .../org/apache/camel/support/DefaultConsumer.java  |  12 +-
 .../org/apache/camel/support/DefaultExchange.java  | 154 +++++--------------
 .../camel/support/DefaultPooledExchange.java       | 165 +++++++++++++++++++++
 7 files changed, 284 insertions(+), 190 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
index dec68a9..bb0b523 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
@@ -18,7 +18,6 @@ package org.apache.camel;
 
 import java.util.List;
 import java.util.Map;
-import java.util.function.Function;
 
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.UnitOfWork;
@@ -30,41 +29,6 @@ import org.apache.camel.spi.UnitOfWork;
 public interface ExtendedExchange extends Exchange {
 
     /**
-     * Registers a task to run when this exchange is done.
-     * <p/>
-     * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
-     */
-    void onDone(Function<Exchange, Boolean> task);
-
-    /**
-     * When the exchange is done being used.
-     * <p/>
-     * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
-     */
-    void done(boolean forced);
-
-    /**
-     * Resets the exchange for reuse with the given created timestamp;
-     * <p/>
-     * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
-     */
-    void reset(long created);
-
-    /**
-     * Whether this exchange was created to auto release when its unit of work is done
-     * <p/>
-     * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
-     */
-    void setAutoRelease(boolean autoRelease);
-
-    /**
-     * Whether this exchange was created to auto release when its unit of work is done
-     * <p/>
-     * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
-     */
-    boolean isAutoRelease();
-
-    /**
      * Sets the endpoint which originated this message exchange. This method should typically only be called by
      * {@link Endpoint} implementations
      */
diff --git a/core/camel-api/src/main/java/org/apache/camel/PooledExchange.java b/core/camel-api/src/main/java/org/apache/camel/PooledExchange.java
new file mode 100644
index 0000000..de0ad66
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/PooledExchange.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import java.util.function.Function;
+
+import org.apache.camel.spi.ExchangeFactory;
+
+/**
+ * Pooled {@link Exchange} which contains the methods and APIs that are not intended for Camel end users but used
+ * internally by Camel for optimizing memory footprint by reusing exchanges created by {@link Consumer}s via
+ * {@link ExchangeFactory}.
+ */
+public interface PooledExchange extends ExtendedExchange {
+
+    /**
+     * Registers a task to run when this exchange is done.
+     * <p/>
+     * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
+     */
+    void onDone(Function<Exchange, Boolean> task);
+
+    /**
+     * When the exchange is done being used.
+     * <p/>
+     * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
+     */
+    void done(boolean forced);
+
+    /**
+     * Resets the exchange for reuse with the given created timestamp;
+     * <p/>
+     * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
+     */
+    void reset(long created);
+
+    /**
+     * Whether this exchange was created to auto release when its unit of work is done
+     * <p/>
+     * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
+     */
+    void setAutoRelease(boolean autoRelease);
+
+    /**
+     * Whether this exchange was created to auto release when its unit of work is done
+     * <p/>
+     * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
+     */
+    boolean isAutoRelease();
+
+}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index 372072a..ee9bf54 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -25,13 +25,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.function.Predicate;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.ExtendedExchange;
-import org.apache.camel.Message;
-import org.apache.camel.Processor;
-import org.apache.camel.Route;
+import org.apache.camel.*;
 import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.SynchronizationVetoable;
@@ -248,11 +242,13 @@ public class DefaultUnitOfWork implements UnitOfWork {
         }
 
         // the exchange is now done
-        try {
-            exchange.adapt(ExtendedExchange.class).done(false);
-        } catch (Throwable e) {
-            // must catch exceptions to ensure synchronizations is also invoked
-            log.warn("Exception occurred during exchange done. This exception will be ignored.", e);
+        if (exchange instanceof PooledExchange) {
+            try {
+                ((PooledExchange) exchange).done(false);
+            } catch (Throwable e) {
+                // must catch exceptions to ensure synchronizations is also invoked
+                log.warn("Exception occurred during exchange done. This exception will be ignored.", e);
+            }
         }
     }
 
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index 83e619b..c5b51b4 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -19,17 +19,9 @@ package org.apache.camel.impl.engine;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.Consumer;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Experimental;
-import org.apache.camel.ExtendedExchange;
-import org.apache.camel.NonManagedService;
-import org.apache.camel.StaticService;
+import org.apache.camel.*;
 import org.apache.camel.spi.ExchangeFactory;
-import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.support.DefaultPooledExchange;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.URISupport;
@@ -99,7 +91,7 @@ public class PooledExchangeFactory extends ServiceSupport
                 created.incrementAndGet();
             }
             // create a new exchange as there was no free from the pool
-            ExtendedExchange answer = new DefaultExchange(camelContext);
+            PooledExchange answer = new DefaultPooledExchange(camelContext);
             answer.setAutoRelease(autoRelease);
             if (autoRelease) {
                 // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created
@@ -111,7 +103,7 @@ public class PooledExchangeFactory extends ServiceSupport
                 acquired.incrementAndGet();
             }
             // reset exchange for reuse
-            ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
+            PooledExchange ee = exchange.adapt(PooledExchange.class);
             ee.reset(System.currentTimeMillis());
         }
         return exchange;
@@ -125,7 +117,7 @@ public class PooledExchangeFactory extends ServiceSupport
                 created.incrementAndGet();
             }
             // create a new exchange as there was no free from the pool
-            ExtendedExchange answer = new DefaultExchange(fromEndpoint);
+            PooledExchange answer = new DefaultPooledExchange(fromEndpoint);
             answer.setAutoRelease(autoRelease);
             if (autoRelease) {
                 // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created
@@ -137,7 +129,7 @@ public class PooledExchangeFactory extends ServiceSupport
                 acquired.incrementAndGet();
             }
             // reset exchange for reuse
-            ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
+            PooledExchange ee = exchange.adapt(PooledExchange.class);
             ee.reset(System.currentTimeMillis());
         }
         return exchange;
@@ -147,7 +139,7 @@ public class PooledExchangeFactory extends ServiceSupport
     public boolean release(Exchange exchange) {
         try {
             // done exchange before returning back to pool
-            ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
+            PooledExchange ee = exchange.adapt(PooledExchange.class);
             boolean force = !ee.isAutoRelease();
             ee.done(force);
             ee.onDone(null);
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index 2c00dc1..685f8c1 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -16,15 +16,7 @@
  */
 package org.apache.camel.support;
 
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.Consumer;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.ExtendedExchange;
-import org.apache.camel.Processor;
-import org.apache.camel.Route;
-import org.apache.camel.RouteAware;
+import org.apache.camel.*;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.spi.RouteIdAware;
@@ -137,7 +129,7 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
         if (exchange != null) {
             if (!autoRelease) {
                 // if not auto release we must manually force done
-                exchange.adapt(ExtendedExchange.class).done(true);
+                exchange.adapt(PooledExchange.class).done(true);
             }
             exchangeFactory.release(exchange);
         }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index a6955b1..adb804c 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.function.Function;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelExecutionException;
@@ -43,51 +42,46 @@ import org.apache.camel.util.ObjectHelper;
 /**
  * The default and only implementation of {@link Exchange}.
  */
-public final class DefaultExchange implements ExtendedExchange {
+public class DefaultExchange implements ExtendedExchange {
+
+    // TODO: AbstractExchange and move somewhere, and have DefaultExchange as thin
 
     private final CamelContext context;
-    private Function<Exchange, Boolean> onDone;
-    private long created;
+    long created;
     // optimize to create properties always and with a reasonable small size
-    private final Map<String, Object> properties = new ConcurrentHashMap<>(8);
-    private Class originalInClassType;
-    private Message in;
-    private Message originalOut;
-    private Message out;
-    private Exception exception;
-    private String exchangeId;
-    private UnitOfWork unitOfWork;
-    private final ExchangePattern originalPattern;
-    private ExchangePattern pattern;
-    private Endpoint fromEndpoint;
-    private String fromRouteId;
-    private List<Synchronization> onCompletions;
-    private Boolean externalRedelivered;
-    private String historyNodeId;
-    private String historyNodeLabel;
-    private boolean transacted;
-    private boolean routeStop;
-    private boolean rollbackOnly;
-    private boolean rollbackOnlyLast;
-    private boolean notifyEvent;
-    private boolean interrupted;
-    private boolean interruptable = true;
-    private boolean redeliveryExhausted;
-    private Boolean errorHandlerHandled;
-    private boolean autoRelease;
+    final Map<String, Object> properties = new ConcurrentHashMap<>(8);
+    Message in;
+    Message out;
+    Exception exception;
+    String exchangeId;
+    UnitOfWork unitOfWork;
+    ExchangePattern pattern;
+    Endpoint fromEndpoint;
+    String fromRouteId;
+    List<Synchronization> onCompletions;
+    Boolean externalRedelivered;
+    String historyNodeId;
+    String historyNodeLabel;
+    boolean transacted;
+    boolean routeStop;
+    boolean rollbackOnly;
+    boolean rollbackOnlyLast;
+    boolean notifyEvent;
+    boolean interrupted;
+    boolean interruptable = true;
+    boolean redeliveryExhausted;
+    Boolean errorHandlerHandled;
 
     public DefaultExchange(CamelContext context) {
         this.context = context;
         this.pattern = ExchangePattern.InOnly;
         this.created = System.currentTimeMillis();
-        this.originalPattern = this.pattern;
     }
 
     public DefaultExchange(CamelContext context, ExchangePattern pattern) {
         this.context = context;
         this.pattern = pattern;
         this.created = System.currentTimeMillis();
-        this.originalPattern = this.pattern;
     }
 
     public DefaultExchange(Exchange parent) {
@@ -97,7 +91,6 @@ public final class DefaultExchange implements ExtendedExchange {
         this.fromEndpoint = parent.getFromEndpoint();
         this.fromRouteId = parent.getFromRouteId();
         this.unitOfWork = parent.getUnitOfWork();
-        this.originalPattern = this.pattern;
     }
 
     public DefaultExchange(Endpoint fromEndpoint) {
@@ -105,7 +98,6 @@ public final class DefaultExchange implements ExtendedExchange {
         this.pattern = ExchangePattern.InOnly;
         this.created = System.currentTimeMillis();
         this.fromEndpoint = fromEndpoint;
-        this.originalPattern = this.pattern;
     }
 
     public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) {
@@ -113,7 +105,6 @@ public final class DefaultExchange implements ExtendedExchange {
         this.pattern = pattern;
         this.created = System.currentTimeMillis();
         this.fromEndpoint = fromEndpoint;
-        this.originalPattern = this.pattern;
     }
 
     @Override
@@ -126,68 +117,6 @@ public final class DefaultExchange implements ExtendedExchange {
         }
     }
 
-    public boolean isAutoRelease() {
-        return autoRelease;
-    }
-
-    public void setAutoRelease(boolean autoRelease) {
-        this.autoRelease = autoRelease;
-    }
-
-    @Override
-    public void onDone(Function<Exchange, Boolean> task) {
-        this.onDone = task;
-    }
-
-    public void done(boolean forced) {
-        if (created > 0 && (forced || autoRelease)) {
-            this.created = 0; // by setting to 0 we also flag that this exchange is done and needs to be reset to use again
-            this.properties.clear();
-            this.exchangeId = null;
-            if (in != null && in.getClass() == originalInClassType) {
-                // okay we can reuse in
-                in.reset();
-            } else {
-                this.in = null;
-            }
-            if (out != null) {
-                out.reset();
-                this.out = null;
-            }
-            if (this.unitOfWork != null) {
-                this.unitOfWork.reset();
-            }
-            this.exception = null;
-            // reset pattern to original
-            this.pattern = originalPattern;
-            if (this.onCompletions != null) {
-                this.onCompletions.clear();
-            }
-            // do not reset endpoint/fromRouteId as it would be the same consumer/endpoint again
-            this.externalRedelivered = null;
-            this.historyNodeId = null;
-            this.historyNodeLabel = null;
-            this.transacted = false;
-            this.routeStop = false;
-            this.rollbackOnly = false;
-            this.rollbackOnlyLast = false;
-            this.notifyEvent = false;
-            this.interrupted = false;
-            this.interruptable = true;
-            this.redeliveryExhausted = false;
-            this.errorHandlerHandled = null;
-
-            if (onDone != null) {
-                onDone.apply(this);
-            }
-        }
-    }
-
-    @Override
-    public void reset(long created) {
-        this.created = created;
-    }
-
     @Override
     public long getCreated() {
         return created;
@@ -404,7 +333,6 @@ public final class DefaultExchange implements ExtendedExchange {
     public Message getIn() {
         if (in == null) {
             in = new DefaultMessage(getContext());
-            originalInClassType = in.getClass();
             configureMessage(in);
         }
         return in;
@@ -428,23 +356,15 @@ public final class DefaultExchange implements ExtendedExchange {
     public void setIn(Message in) {
         this.in = in;
         configureMessage(in);
-        if (in != null) {
-            this.originalInClassType = in.getClass();
-        }
     }
 
     @Override
     public Message getOut() {
         // lazy create
         if (out == null) {
-            if (originalOut != null) {
-                out = originalOut;
-            } else {
-                // we can only optimize OUT when its using a default message instance
-                out = new DefaultMessage(this);
-                configureMessage(out);
-                originalOut = out;
-            }
+            out = (in instanceof MessageSupport)
+                    ? ((MessageSupport) in).newInstance() : new DefaultMessage(getContext());
+            configureMessage(out);
         }
         return out;
     }
@@ -475,10 +395,7 @@ public final class DefaultExchange implements ExtendedExchange {
     @Override
     public void setOut(Message out) {
         this.out = out;
-        if (out != null) {
-            configureMessage(out);
-            this.originalOut = null; // we use custom out
-        }
+        configureMessage(out);
     }
 
     @Override
@@ -645,7 +562,7 @@ public final class DefaultExchange implements ExtendedExchange {
     @Override
     public void setUnitOfWork(UnitOfWork unitOfWork) {
         this.unitOfWork = unitOfWork;
-        if (unitOfWork != null && onCompletions != null && !onCompletions.isEmpty()) {
+        if (unitOfWork != null && onCompletions != null) {
             // now an unit of work has been assigned so add the on completions
             // we might have registered already
             for (Synchronization onCompletion : onCompletions) {
@@ -654,6 +571,7 @@ public final class DefaultExchange implements ExtendedExchange {
             // cleanup the temporary on completion list as they now have been registered
             // on the unit of work
             onCompletions.clear();
+            onCompletions = null;
         }
     }
 
@@ -667,7 +585,7 @@ public final class DefaultExchange implements ExtendedExchange {
             }
             onCompletions.add(onCompletion);
         } else {
-            unitOfWork.addSynchronization(onCompletion);
+            getUnitOfWork().addSynchronization(onCompletion);
         }
     }
 
@@ -684,12 +602,13 @@ public final class DefaultExchange implements ExtendedExchange {
 
     @Override
     public void handoverCompletions(Exchange target) {
-        if (onCompletions != null && !onCompletions.isEmpty()) {
+        if (onCompletions != null) {
             for (Synchronization onCompletion : onCompletions) {
                 target.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
             }
             // cleanup the temporary on completion list as they have been handed over
             onCompletions.clear();
+            onCompletions = null;
         } else if (unitOfWork != null) {
             // let unit of work handover
             unitOfWork.handoverSynchronization(target);
@@ -699,9 +618,10 @@ public final class DefaultExchange implements ExtendedExchange {
     @Override
     public List<Synchronization> handoverCompletions() {
         List<Synchronization> answer = null;
-        if (onCompletions != null && !onCompletions.isEmpty()) {
+        if (onCompletions != null) {
             answer = new ArrayList<>(onCompletions);
             onCompletions.clear();
+            onCompletions = null;
         }
         return answer;
     }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
new file mode 100644
index 0000000..17ec269
--- /dev/null
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.function.Function;
+
+import org.apache.camel.*;
+
+/**
+ * The default {@link PooledExchange}.
+ */
+public final class DefaultPooledExchange extends DefaultExchange implements PooledExchange {
+
+    private Function<Exchange, Boolean> onDone;
+    private Class originalInClassType;
+    private Message originalOut;
+    private final ExchangePattern originalPattern;
+    private boolean autoRelease;
+
+    public DefaultPooledExchange(CamelContext context) {
+        super(context);
+        this.originalPattern = getPattern();
+    }
+
+    public DefaultPooledExchange(CamelContext context, ExchangePattern pattern) {
+        super(context, pattern);
+        this.originalPattern = pattern;
+    }
+
+    public DefaultPooledExchange(Exchange parent) {
+        super(parent);
+        this.originalPattern = parent.getPattern();
+    }
+
+    public DefaultPooledExchange(Endpoint fromEndpoint) {
+        super(fromEndpoint);
+        this.originalPattern = getPattern();
+    }
+
+    public DefaultPooledExchange(Endpoint fromEndpoint, ExchangePattern pattern) {
+        super(fromEndpoint, pattern);
+        this.originalPattern = pattern;
+    }
+
+    public boolean isAutoRelease() {
+        return autoRelease;
+    }
+
+    public void setAutoRelease(boolean autoRelease) {
+        this.autoRelease = autoRelease;
+    }
+
+    @Override
+    public void onDone(Function<Exchange, Boolean> task) {
+        this.onDone = task;
+    }
+
+    public void done(boolean forced) {
+        if (created > 0 && (forced || autoRelease)) {
+            this.created = 0; // by setting to 0 we also flag that this exchange is done and needs to be reset to use again
+            this.properties.clear();
+            this.exchangeId = null;
+            if (in != null && in.getClass() == originalInClassType) {
+                // okay we can reuse in
+                in.reset();
+            } else {
+                this.in = null;
+            }
+            if (out != null) {
+                out.reset();
+                this.out = null;
+            }
+            if (this.unitOfWork != null) {
+                this.unitOfWork.reset();
+            }
+            this.exception = null;
+            // reset pattern to original
+            this.pattern = originalPattern;
+            if (this.onCompletions != null) {
+                this.onCompletions.clear();
+            }
+            // do not reset endpoint/fromRouteId as it would be the same consumer/endpoint again
+            this.externalRedelivered = null;
+            this.historyNodeId = null;
+            this.historyNodeLabel = null;
+            this.transacted = false;
+            this.routeStop = false;
+            this.rollbackOnly = false;
+            this.rollbackOnlyLast = false;
+            this.notifyEvent = false;
+            this.interrupted = false;
+            this.interruptable = true;
+            this.redeliveryExhausted = false;
+            this.errorHandlerHandled = null;
+
+            if (onDone != null) {
+                onDone.apply(this);
+            }
+        }
+    }
+
+    @Override
+    public void reset(long created) {
+        this.created = created;
+    }
+
+    @Override
+    public Message getIn() {
+        if (in == null) {
+            in = new DefaultMessage(getContext());
+            originalInClassType = in.getClass();
+            configureMessage(in);
+        }
+        return in;
+    }
+
+    @Override
+    public void setIn(Message in) {
+        this.in = in;
+        configureMessage(in);
+        if (in != null) {
+            this.originalInClassType = in.getClass();
+        }
+    }
+
+    @Override
+    public Message getOut() {
+        // lazy create
+        if (out == null) {
+            if (originalOut != null) {
+                out = originalOut;
+            } else {
+                // we can only optimize OUT when its using a default message instance
+                out = new DefaultMessage(this);
+                configureMessage(out);
+                originalOut = out;
+            }
+        }
+        return out;
+    }
+
+    @Override
+    public void setOut(Message out) {
+        this.out = out;
+        if (out != null) {
+            configureMessage(out);
+            this.originalOut = null; // we use custom out
+        }
+    }
+
+}


[camel] 04/05: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f6d0603c6098bc210ef13ff1befc408b04eb40e6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Feb 20 17:07:51 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../java/org/apache/camel/spi/ExchangeFactory.java | 11 ++++
 .../camel/impl/engine/DefaultExchangeFactory.java  | 18 ++++++-
 .../camel/impl/engine/PooledExchangeFactory.java   | 63 ++++++++++------------
 3 files changed, 55 insertions(+), 37 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index 9bc469f..197d473 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -72,4 +72,15 @@ public interface ExchangeFactory {
     default boolean release(Exchange exchange) {
         return true;
     }
+
+    /**
+     * Whether statistics is enabled.
+     */
+    boolean isStatisticsEnabled();
+
+    /**
+     * Whether statistics is enabled.
+     */
+    void setStatisticsEnabled(boolean statisticsEnabled);
+
 }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
index a8db865..469fb7c 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
@@ -16,14 +16,18 @@
  */
 package org.apache.camel.impl.engine;
 
-import org.apache.camel.*;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.support.DefaultExchange;
 
 /**
  * Default {@link ExchangeFactory} that creates a new {@link Exchange} instance.
  */
-public class DefaultExchangeFactory implements ExchangeFactory, CamelContextAware {
+public final class DefaultExchangeFactory implements ExchangeFactory, CamelContextAware {
 
     private CamelContext camelContext;
 
@@ -53,4 +57,14 @@ public class DefaultExchangeFactory implements ExchangeFactory, CamelContextAwar
         return new DefaultExchange(fromEndpoint);
     }
 
+    @Override
+    public boolean isStatisticsEnabled() {
+        return false;
+    }
+
+    @Override
+    public void setStatisticsEnabled(boolean statisticsEnabled) {
+        // not in use
+    }
+
 }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index c5b51b4..4b228eb0 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -19,10 +19,17 @@ package org.apache.camel.impl.engine;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.camel.*;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Experimental;
+import org.apache.camel.NonManagedService;
+import org.apache.camel.PooledExchange;
+import org.apache.camel.StaticService;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.support.DefaultPooledExchange;
-import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
@@ -32,7 +39,7 @@ import org.slf4j.LoggerFactory;
  * Pooled {@link ExchangeFactory} that reuses {@link Exchange} instance from a pool.
  */
 @Experimental
-public class PooledExchangeFactory extends ServiceSupport
+public final class PooledExchangeFactory extends ServiceSupport
         implements ExchangeFactory, CamelContextAware, StaticService, NonManagedService {
 
     // TODO: optimize onDone lambdas as they will be created per instance, and we can use static linked
@@ -40,7 +47,6 @@ public class PooledExchangeFactory extends ServiceSupport
     private static final Logger LOG = LoggerFactory.getLogger(PooledExchangeFactory.class);
 
     private final Consumer consumer;
-    private final ReleaseOnCompletion onCompletion = new ReleaseOnCompletion();
     private final ConcurrentLinkedQueue<Exchange> pool = new ConcurrentLinkedQueue<>();
     private final AtomicLong acquired = new AtomicLong();
     private final AtomicLong created = new AtomicLong();
@@ -48,7 +54,7 @@ public class PooledExchangeFactory extends ServiceSupport
     private final AtomicLong discarded = new AtomicLong();
 
     private CamelContext camelContext;
-    private boolean statisticsEnabled = true;
+    private boolean statisticsEnabled;
 
     public PooledExchangeFactory() {
         this.consumer = null;
@@ -91,13 +97,7 @@ public class PooledExchangeFactory extends ServiceSupport
                 created.incrementAndGet();
             }
             // create a new exchange as there was no free from the pool
-            PooledExchange answer = new DefaultPooledExchange(camelContext);
-            answer.setAutoRelease(autoRelease);
-            if (autoRelease) {
-                // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created
-                answer.onDone(this::release);
-            }
-            return answer;
+            exchange = createPooledExchange(null, autoRelease);
         } else {
             if (statisticsEnabled) {
                 acquired.incrementAndGet();
@@ -117,13 +117,7 @@ public class PooledExchangeFactory extends ServiceSupport
                 created.incrementAndGet();
             }
             // create a new exchange as there was no free from the pool
-            PooledExchange answer = new DefaultPooledExchange(fromEndpoint);
-            answer.setAutoRelease(autoRelease);
-            if (autoRelease) {
-                // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created
-                answer.onDone(this::release);
-            }
-            return answer;
+            exchange = new DefaultPooledExchange(fromEndpoint);
         } else {
             if (statisticsEnabled) {
                 acquired.incrementAndGet();
@@ -158,6 +152,21 @@ public class PooledExchangeFactory extends ServiceSupport
         }
     }
 
+    protected PooledExchange createPooledExchange(Endpoint fromEndpoint, boolean autoRelease) {
+        PooledExchange answer = null;
+        if (fromEndpoint != null) {
+            answer = new DefaultPooledExchange(fromEndpoint);
+        } else {
+            answer = new DefaultPooledExchange(camelContext);
+        }
+        answer.setAutoRelease(autoRelease);
+        if (autoRelease) {
+            // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created
+            answer.onDone(this::release);
+        }
+        return answer;
+    }
+
     @Override
     protected void doStop() throws Exception {
         pool.clear();
@@ -180,20 +189,4 @@ public class PooledExchangeFactory extends ServiceSupport
         discarded.set(0);
     }
 
-    private final class ReleaseOnCompletion extends SynchronizationAdapter {
-
-        @Override
-        public int getOrder() {
-            // should be very very last so set as highest value possible
-            return Integer.MAX_VALUE;
-        }
-
-        @Override
-        public void onDone(Exchange exchange) {
-            if (exchange != null) {
-                release(exchange);
-            }
-        }
-    }
-
 }