You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/05 16:47:09 UTC

[camel] 01/05: CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 59c8d801a254879e9b72745bc16933023a5682c6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 5 16:03:59 2021 +0200

    CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP
---
 .../org/apache/camel/ExtendedCamelContext.java     |  11 ++
 .../java/org/apache/camel/spi/ExchangeFactory.java |   3 +
 .../apache/camel/spi/ProcessorExchangeFactory.java |  71 +++++++++++
 .../camel/impl/engine/AbstractCamelContext.java    |  24 ++++
 .../engine/PrototypeProcessorExchangeFactory.java  | 136 +++++++++++++++++++++
 .../camel/impl/engine/SimpleCamelContext.java      |  12 ++
 .../camel/impl/ExtendedCamelContextConfigurer.java |   6 +
 .../camel/impl/lw/LightweightCamelContext.java     |  11 ++
 .../impl/lw/LightweightRuntimeCamelContext.java    |  13 ++
 .../apache/camel/processor/WireTapProcessor.java   |  34 ++++--
 .../org/apache/camel/reifier/WireTapReifier.java   |   3 +-
 11 files changed, 309 insertions(+), 15 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
index 654cc52..b105f47 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
@@ -54,6 +54,7 @@ import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.NormalizedEndpointUri;
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.PackageScanResourceResolver;
+import org.apache.camel.spi.ProcessorExchangeFactory;
 import org.apache.camel.spi.ProcessorFactory;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.Registry;
@@ -237,6 +238,16 @@ public interface ExtendedCamelContext extends CamelContext {
     void setExchangeFactoryManager(ExchangeFactoryManager exchangeFactoryManager);
 
     /**
+     * Gets the processor exchange factory to use.
+     */
+    ProcessorExchangeFactory getProcessorExchangeFactory();
+
+    /**
+     * Sets a custom processor exchange factory to use.
+     */
+    void setProcessorExchangeFactory(ProcessorExchangeFactory processorExchangeFactory);
+
+    /**
      * Returns the bean post processor used to do any bean customization.
      *
      * @return the bean post processor.
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index 75b9d21..83a5e04 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -33,6 +33,9 @@ import org.apache.camel.NonManagedService;
  * <p/>
  * The factory is pluggable which allows to use different strategies. The default factory will create a new
  * {@link Exchange} instance, and the pooled factory will pool and reuse exchanges.
+ *
+ * @see ProcessorExchangeFactory
+ * @see org.apache.camel.PooledExchange
  */
 public interface ExchangeFactory extends PooledObjectFactory<Exchange>, NonManagedService, RouteIdAware {
 
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
new file mode 100644
index 0000000..77142d4
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.NonManagedService;
+import org.apache.camel.Processor;
+
+/**
+ * Factory used by {@link org.apache.camel.Processor} (EIPs) when they create copies of the processed {@link Exchange}.
+ * <p/>
+ * Some EIPs like WireTap, Multicast, Split etc creates copies of the processed exchange which they use as sub
+ * exchanges. This factory allows to use exchange pooling.
+ *
+ * The factory is pluggable which allows to use different strategies. The default factory will create a new
+ * {@link Exchange} instance, and the pooled factory will pool and reuse exchanges.
+ *
+ * @see ExchangeFactory
+ * @see org.apache.camel.PooledExchange
+ */
+public interface ProcessorExchangeFactory extends PooledObjectFactory<Exchange>, NonManagedService, RouteIdAware, IdAware {
+
+    /**
+     * Service factory key.
+     */
+    String FACTORY = "processor-exchange-factory";
+
+    /**
+     * The processor using this factory.
+     */
+    Processor getProcessor();
+
+    /**
+     * Creates a new {@link ProcessorExchangeFactory} that is private for the given consumer.
+     *
+     * @param  processor the processor that will use the created {@link ProcessorExchangeFactory}
+     * @return           the created factory.
+     */
+    ProcessorExchangeFactory newProcessorExchangeFactory(Processor processor);
+
+    /**
+     * Creates a copy of the given {@link Exchange}
+     *
+     * @param exchange original copy of the exchange
+     * @param handover whether the on completion callbacks should be handed over to the new copy.
+     */
+    Exchange createCorrelatedCopy(Exchange exchange, boolean handover);
+
+    /**
+     * Releases the exchange back into the pool
+     *
+     * @param  exchange the exchange
+     * @return          true if released into the pool, or false if something went wrong and the exchange was discarded
+     */
+    boolean release(Exchange exchange);
+
+}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index afa7832..c1d8fd1 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -130,6 +130,7 @@ import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.NormalizedEndpointUri;
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.PackageScanResourceResolver;
+import org.apache.camel.spi.ProcessorExchangeFactory;
 import org.apache.camel.spi.ProcessorFactory;
 import org.apache.camel.spi.PropertiesComponent;
 import org.apache.camel.spi.ReactiveExecutor;
@@ -271,6 +272,7 @@ public abstract class AbstractCamelContext extends BaseService
     private volatile CamelContextNameStrategy nameStrategy;
     private volatile ExchangeFactoryManager exchangeFactoryManager;
     private volatile ExchangeFactory exchangeFactory;
+    private volatile ProcessorExchangeFactory processorExchangeFactory;
     private volatile ReactiveExecutor reactiveExecutor;
     private volatile ManagementNameStrategy managementNameStrategy;
     private volatile Registry registry;
@@ -3701,6 +3703,7 @@ public abstract class AbstractCamelContext extends BaseService
         asyncProcessorAwaitManager = null;
         exchangeFactory = null;
         exchangeFactoryManager = null;
+        processorExchangeFactory = null;
         registry = null;
     }
 
@@ -4715,6 +4718,25 @@ public abstract class AbstractCamelContext extends BaseService
     }
 
     @Override
+    public ProcessorExchangeFactory getProcessorExchangeFactory() {
+        if (processorExchangeFactory == null) {
+            synchronized (lock) {
+                if (processorExchangeFactory == null) {
+                    setProcessorExchangeFactory(createProcessorExchangeFactory());
+                }
+            }
+        }
+        return processorExchangeFactory;
+    }
+
+    @Override
+    public void setProcessorExchangeFactory(ProcessorExchangeFactory processorExchangeFactory) {
+        // automatic inject camel context
+        processorExchangeFactory.setCamelContext(this);
+        this.processorExchangeFactory = processorExchangeFactory;
+    }
+
+    @Override
     public ReactiveExecutor getReactiveExecutor() {
         if (reactiveExecutor == null) {
             synchronized (lock) {
@@ -4810,6 +4832,8 @@ public abstract class AbstractCamelContext extends BaseService
 
     protected abstract ExchangeFactoryManager createExchangeFactoryManager();
 
+    protected abstract ProcessorExchangeFactory createProcessorExchangeFactory();
+
     protected abstract HealthCheckRegistry createHealthCheckRegistry();
 
     protected abstract ReactiveExecutor createReactiveExecutor();
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
new file mode 100644
index 0000000..0b10211
--- /dev/null
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.ProcessorExchangeFactory;
+import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.PooledObjectFactorySupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link org.apache.camel.spi.ProcessorExchangeFactory} that creates a new {@link Exchange} instance.
+ */
+public class PrototypeProcessorExchangeFactory extends PooledObjectFactorySupport<Exchange>
+        implements ProcessorExchangeFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PrototypeProcessorExchangeFactory.class);
+
+    final Processor processor;
+    String routeId;
+    String id;
+
+    public PrototypeProcessorExchangeFactory() {
+        this.processor = null;
+    }
+
+    public PrototypeProcessorExchangeFactory(Processor processor) {
+        this.processor = processor;
+    }
+
+    @Override
+    public String getRouteId() {
+        return routeId;
+    }
+
+    @Override
+    public void setRouteId(String routeId) {
+        this.routeId = routeId;
+    }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public Processor getProcessor() {
+        return processor;
+    }
+
+    @Override
+    public ProcessorExchangeFactory newProcessorExchangeFactory(Processor processor) {
+        PrototypeProcessorExchangeFactory answer = new PrototypeProcessorExchangeFactory(processor);
+        answer.setStatisticsEnabled(statisticsEnabled);
+        answer.setCapacity(capacity);
+        answer.setCamelContext(camelContext);
+        return answer;
+    }
+
+    @Override
+    public Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
+        return ExchangeHelper.createCorrelatedCopy(exchange, handover);
+    }
+
+    @Override
+    public Exchange acquire() {
+        throw new UnsupportedOperationException("Not in use");
+    }
+
+    @Override
+    public boolean release(Exchange exchange) {
+        if (statisticsEnabled) {
+            statistics.released.increment();
+        }
+        return true;
+    }
+
+    @Override
+    public boolean isPooled() {
+        return false;
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        logUsageSummary(LOG, "PrototypeProcessorExchangeFactory", 0);
+    }
+
+    void logUsageSummary(Logger log, String name, int pooled) {
+        if (statisticsEnabled && processor != null) {
+            // only log if there is any usage
+            long created = statistics.getCreatedCounter();
+            long acquired = statistics.getAcquiredCounter();
+            long released = statistics.getReleasedCounter();
+            long discarded = statistics.getDiscardedCounter();
+            boolean shouldLog = pooled > 0 || created > 0 || acquired > 0 || released > 0 || discarded > 0;
+            if (shouldLog) {
+                String rid = getRouteId();
+                String pid = getId();
+
+                // are there any leaks?
+                boolean leak = created + acquired > released + discarded;
+                if (leak) {
+                    long leaks = (created + acquired) - (released + discarded);
+                    log.info(
+                            "{} {} ({}) usage (leaks detected: {}) [pooled: {}, created: {}, acquired: {} released: {}, discarded: {}]",
+                            name, rid, pid, leaks, pooled, created, acquired, released, discarded);
+                } else {
+                    log.info("{} {} ({}) usage [pooled: {}, created: {}, acquired: {} released: {}, discarded: {}]",
+                            name, rid, pid, pooled, created, acquired, released, discarded);
+                }
+            }
+        }
+    }
+
+}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
index 3c46c48..254fc2d 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
@@ -59,6 +59,7 @@ import org.apache.camel.spi.ModelToXMLDumper;
 import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.PackageScanResourceResolver;
+import org.apache.camel.spi.ProcessorExchangeFactory;
 import org.apache.camel.spi.ProcessorFactory;
 import org.apache.camel.spi.PropertiesComponent;
 import org.apache.camel.spi.ReactiveExecutor;
@@ -574,6 +575,17 @@ public class SimpleCamelContext extends AbstractCamelContext {
     }
 
     @Override
+    protected ProcessorExchangeFactory createProcessorExchangeFactory() {
+        Optional<ProcessorExchangeFactory> result = ResolverHelper.resolveService(
+                getCamelContextReference(),
+                getBootstrapFactoryFinder(),
+                ProcessorExchangeFactory.FACTORY,
+                ProcessorExchangeFactory.class);
+
+        return result.orElseGet(PrototypeProcessorExchangeFactory::new);
+    }
+
+    @Override
     protected ReactiveExecutor createReactiveExecutor() {
         Optional<ReactiveExecutor> result = ResolverHelper.resolveService(
                 getCamelContextReference(),
diff --git a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
index d06c43d..9d5d73a 100644
--- a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
+++ b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
@@ -121,6 +121,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
         case "PackageScanClassResolver": target.setPackageScanClassResolver(property(camelContext, org.apache.camel.spi.PackageScanClassResolver.class, value)); return true;
         case "packagescanresourceresolver":
         case "PackageScanResourceResolver": target.setPackageScanResourceResolver(property(camelContext, org.apache.camel.spi.PackageScanResourceResolver.class, value)); return true;
+        case "processorexchangefactory":
+        case "ProcessorExchangeFactory": target.setProcessorExchangeFactory(property(camelContext, org.apache.camel.spi.ProcessorExchangeFactory.class, value)); return true;
         case "processorfactory":
         case "ProcessorFactory": target.setProcessorFactory(property(camelContext, org.apache.camel.spi.ProcessorFactory.class, value)); return true;
         case "propertiescomponent":
@@ -294,6 +296,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
         case "PackageScanClassResolver": return org.apache.camel.spi.PackageScanClassResolver.class;
         case "packagescanresourceresolver":
         case "PackageScanResourceResolver": return org.apache.camel.spi.PackageScanResourceResolver.class;
+        case "processorexchangefactory":
+        case "ProcessorExchangeFactory": return org.apache.camel.spi.ProcessorExchangeFactory.class;
         case "processorfactory":
         case "ProcessorFactory": return org.apache.camel.spi.ProcessorFactory.class;
         case "propertiescomponent":
@@ -468,6 +472,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
         case "PackageScanClassResolver": return target.getPackageScanClassResolver();
         case "packagescanresourceresolver":
         case "PackageScanResourceResolver": return target.getPackageScanResourceResolver();
+        case "processorexchangefactory":
+        case "ProcessorExchangeFactory": return target.getProcessorExchangeFactory();
         case "processorfactory":
         case "ProcessorFactory": return target.getProcessorFactory();
         case "propertiescomponent":
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
index 6101ee5..626d4cf 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
@@ -114,6 +114,7 @@ import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.NormalizedEndpointUri;
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.PackageScanResourceResolver;
+import org.apache.camel.spi.ProcessorExchangeFactory;
 import org.apache.camel.spi.ProcessorFactory;
 import org.apache.camel.spi.PropertiesComponent;
 import org.apache.camel.spi.ReactiveExecutor;
@@ -1469,6 +1470,16 @@ public class LightweightCamelContext implements ExtendedCamelContext, CatalogCam
     }
 
     @Override
+    public ProcessorExchangeFactory getProcessorExchangeFactory() {
+        return getExtendedCamelContext().getProcessorExchangeFactory();
+    }
+
+    @Override
+    public void setProcessorExchangeFactory(ProcessorExchangeFactory processorExchangeFactory) {
+        getExtendedCamelContext().setProcessorExchangeFactory(processorExchangeFactory);
+    }
+
+    @Override
     public ReactiveExecutor getReactiveExecutor() {
         return getExtendedCamelContext().getReactiveExecutor();
     }
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
index af98c56..d70e23d 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
@@ -110,6 +110,7 @@ import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.NormalizedEndpointUri;
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.PackageScanResourceResolver;
+import org.apache.camel.spi.ProcessorExchangeFactory;
 import org.apache.camel.spi.ProcessorFactory;
 import org.apache.camel.spi.PropertiesComponent;
 import org.apache.camel.spi.ReactiveExecutor;
@@ -171,6 +172,7 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
     private final HeadersMapFactory headersMapFactory;
     private final ExchangeFactory exchangeFactory;
     private final ExchangeFactoryManager exchangeFactoryManager;
+    private final ProcessorExchangeFactory processorExchangeFactory;
     private final ReactiveExecutor reactiveExecutor;
     private final AsyncProcessorAwaitManager asyncProcessorAwaitManager;
     private final ExecutorServiceManager executorServiceManager;
@@ -217,6 +219,7 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
         headersMapFactory = context.adapt(ExtendedCamelContext.class).getHeadersMapFactory();
         exchangeFactory = context.adapt(ExtendedCamelContext.class).getExchangeFactory();
         exchangeFactoryManager = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager();
+        processorExchangeFactory = context.adapt(ExtendedCamelContext.class).getProcessorExchangeFactory();
         reactiveExecutor = context.adapt(ExtendedCamelContext.class).getReactiveExecutor();
         asyncProcessorAwaitManager = context.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
         executorServiceManager = context.getExecutorServiceManager();
@@ -1578,6 +1581,16 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
     }
 
     @Override
+    public ProcessorExchangeFactory getProcessorExchangeFactory() {
+        return processorExchangeFactory;
+    }
+
+    @Override
+    public void setProcessorExchangeFactory(ProcessorExchangeFactory processorExchangeFactory) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public ReactiveExecutor getReactiveExecutor() {
         return reactiveExecutor;
     }
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index b6c7822..8cf889b 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -39,12 +39,12 @@ import org.apache.camel.StreamCache;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ProcessorExchangeFactory;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.DefaultExchange;
-import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -67,26 +67,28 @@ public class WireTapProcessor extends AsyncProcessorSupport
     private final Processor processor;
     private final AsyncProcessor asyncProcessor;
     private final ExchangePattern exchangePattern;
+    private final boolean copy;
     private final ExecutorService executorService;
     private volatile boolean shutdownExecutorService;
     private final LongAdder taskCount = new LongAdder();
+    private ProcessorExchangeFactory processorExchangeFactory;
     private PooledExchangeTaskFactory taskFactory;
 
     // expression or processor used for populating a new exchange to send
     // as opposed to traditional wiretap that sends a copy of the original exchange
     private Expression newExchangeExpression;
     private List<Processor> newExchangeProcessors;
-    private boolean copy;
     private Processor onPrepare;
 
     public WireTapProcessor(SendDynamicProcessor dynamicSendProcessor, Processor processor, String uri,
-                            ExchangePattern exchangePattern,
+                            ExchangePattern exchangePattern, boolean copy,
                             ExecutorService executorService, boolean shutdownExecutorService, boolean dynamicUri) {
         this.dynamicSendProcessor = dynamicSendProcessor;
         this.uri = uri;
         this.processor = processor;
         this.asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
         this.exchangePattern = exchangePattern;
+        this.copy = copy;
         ObjectHelper.notNull(executorService, "executorService");
         this.executorService = executorService;
         this.shutdownExecutorService = shutdownExecutorService;
@@ -101,6 +103,9 @@ public class WireTapProcessor extends AsyncProcessorSupport
             public void done(boolean doneSync) {
                 taskCount.decrement();
                 taskFactory.release(WireTapTask.this);
+                if (processorExchangeFactory != null) {
+                    processorExchangeFactory.release(exchange);
+                }
             }
         };
 
@@ -272,7 +277,7 @@ public class WireTapProcessor extends AsyncProcessorSupport
 
     private Exchange configureCopyExchange(Exchange exchange) {
         // must use a copy as we dont want it to cause side effects of the original exchange
-        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+        Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
         // set MEP to InOnly as this wire tap is a fire and forget
         copy.setPattern(ExchangePattern.InOnly);
         // remove STREAM_CACHE_UNIT_OF_WORK property because this wire tap will
@@ -282,6 +287,7 @@ public class WireTapProcessor extends AsyncProcessorSupport
     }
 
     private Exchange configureNewExchange(Exchange exchange) {
+        // no copy so lets just create a new exchange always
         return new DefaultExchange(exchange.getFromEndpoint(), ExchangePattern.InOnly);
     }
 
@@ -312,10 +318,6 @@ public class WireTapProcessor extends AsyncProcessorSupport
         return copy;
     }
 
-    public void setCopy(boolean copy) {
-        this.copy = copy;
-    }
-
     public Processor getOnPrepare() {
         return onPrepare;
     }
@@ -350,6 +352,12 @@ public class WireTapProcessor extends AsyncProcessorSupport
 
     @Override
     protected void doBuild() throws Exception {
+        if (copy) {
+            // create a per processor exchange factory
+            this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class)
+                    .getProcessorExchangeFactory().newProcessorExchangeFactory(this);
+        }
+
         boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
         if (pooled) {
             taskFactory = new PooledTaskFactory(getId()) {
@@ -370,27 +378,27 @@ public class WireTapProcessor extends AsyncProcessorSupport
         }
         LOG.trace("Using TaskFactory: {}", taskFactory);
 
-        ServiceHelper.buildService(taskFactory, processor);
+        ServiceHelper.buildService(processorExchangeFactory, taskFactory, processor);
     }
 
     @Override
     protected void doInit() throws Exception {
-        ServiceHelper.initService(taskFactory, processor);
+        ServiceHelper.initService(processorExchangeFactory, taskFactory, processor);
     }
 
     @Override
     protected void doStart() throws Exception {
-        ServiceHelper.startService(taskFactory, processor);
+        ServiceHelper.startService(processorExchangeFactory, taskFactory, processor);
     }
 
     @Override
     protected void doStop() throws Exception {
-        ServiceHelper.stopService(taskFactory, processor);
+        ServiceHelper.stopService(processorExchangeFactory, taskFactory, processor);
     }
 
     @Override
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(taskFactory, processor);
+        ServiceHelper.stopAndShutdownServices(processorExchangeFactory, taskFactory, processor);
         if (shutdownExecutorService) {
             getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
         }
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
index 3d0c214..23a7508 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
@@ -84,9 +84,8 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
 
         WireTapProcessor answer = new WireTapProcessor(
                 dynamicSendProcessor, target, uri,
-                parse(ExchangePattern.class, definition.getPattern()),
+                parse(ExchangePattern.class, definition.getPattern()), isCopy,
                 threadPool, shutdownThreadPool, dynamic);
-        answer.setCopy(isCopy);
         Processor newExchangeProcessor = definition.getNewExchangeProcessor();
         String ref = parseString(definition.getNewExchangeProcessorRef());
         if (ref != null) {