You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/05 16:47:08 UTC

[camel] branch master updated (0649f57 -> ab47d9b)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 0649f57  Reduce logging noise
     new 59c8d80  CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP
     new b19ba05  CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP
     new 54c5f2c  CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP
     new c8dd388  camel-core - Fixed java.util.ConcurrentModificationException when using extended endpoint utilization statistics.
     new ab47d9b  CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/camel/ExtendedCamelContext.java     |  11 ++
 .../camel/spi/EndpointUtilizationStatistics.java   |   2 +-
 .../java/org/apache/camel/spi/ExchangeFactory.java |   3 +
 .../apache/camel/spi/ProcessorExchangeFactory.java |  78 +++++++++++++
 .../camel/impl/engine/AbstractCamelContext.java    |  24 ++++
 ...ry.java => PooledProcessorExchangeFactory.java} | 124 ++++++++-------------
 ...java => PrototypeProcessorExchangeFactory.java} | 100 +++++++----------
 .../camel/impl/engine/SimpleCamelContext.java      |  12 ++
 .../camel/impl/ExtendedCamelContextConfigurer.java |   6 +
 .../camel/impl/lw/LightweightCamelContext.java     |  11 ++
 .../impl/lw/LightweightRuntimeCamelContext.java    |  13 +++
 .../apache/camel/processor/WireTapProcessor.java   |  36 +++---
 .../org/apache/camel/reifier/WireTapReifier.java   |   3 +-
 .../camel/main/DefaultConfigurationConfigurer.java |   6 +
 .../DefaultEndpointUtilizationStatistics.java      |   2 +-
 15 files changed, 272 insertions(+), 159 deletions(-)
 create mode 100644 core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
 copy core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/{PooledExchangeFactory.java => PooledProcessorExchangeFactory.java} (51%)
 copy core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/{PrototypeExchangeFactory.java => PrototypeProcessorExchangeFactory.java} (54%)

[camel] 04/05: camel-core - Fixed java.util.ConcurrentModificationException when using extended endpoint utilization statistics.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c8dd38846531b5df0be262a6f7b890934670ecab
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 5 17:37:18 2021 +0200

    camel-core - Fixed java.util.ConcurrentModificationException when using extended endpoint utilization statistics.
---
 .../main/java/org/apache/camel/spi/EndpointUtilizationStatistics.java  | 2 +-
 .../src/main/java/org/apache/camel/processor/WireTapProcessor.java     | 1 -
 .../org/apache/camel/support/DefaultEndpointUtilizationStatistics.java | 3 ++-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/EndpointUtilizationStatistics.java b/core/camel-api/src/main/java/org/apache/camel/spi/EndpointUtilizationStatistics.java
index 990389e..17060fa 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/EndpointUtilizationStatistics.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/EndpointUtilizationStatistics.java
@@ -34,7 +34,7 @@ public interface EndpointUtilizationStatistics {
     int size();
 
     /**
-     * Callback when an endpoint is being utilizated by an {@link org.apache.camel.Processor} EIP such as sending a
+     * Callback when an endpoint is being utilized by an {@link org.apache.camel.Processor} EIP such as sending a
      * message to a dynamic endpoint.
      *
      * @param uri the endpoint uri
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index f3b90d4..f90f7f2 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -44,7 +44,6 @@ import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
-import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpointUtilizationStatistics.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpointUtilizationStatistics.java
index abea2e1..ae024ec 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpointUtilizationStatistics.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpointUtilizationStatistics.java
@@ -18,6 +18,7 @@ package org.apache.camel.support;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 
@@ -43,7 +44,7 @@ public class DefaultEndpointUtilizationStatistics implements EndpointUtilization
     }
 
     @Override
-    public void onHit(String uri) {
+    public synchronized void onHit(String uri) {
         map.compute(uri, (key, current) -> {
             if (current == null) {
                 return 1L;

[camel] 02/05: CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b19ba0569f398f2a0f341834f957401fef8cc743
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 5 16:47:06 2021 +0200

    CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP
---
 .../engine/PooledProcessorExchangeFactory.java     | 119 +++++++++++++++++++++
 .../apache/camel/processor/WireTapProcessor.java   |   2 +-
 .../camel/main/DefaultConfigurationConfigurer.java |   6 ++
 3 files changed, 126 insertions(+), 1 deletion(-)

diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
new file mode 100644
index 0000000..d3c37a6
--- /dev/null
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.PooledExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.ProcessorExchangeFactory;
+import org.apache.camel.support.DefaultPooledExchange;
+import org.apache.camel.support.ExchangeHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Pooled {@link org.apache.camel.spi.ProcessorExchangeFactory} that reuses {@link Exchange} instance from a pool.
+ */
+public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PooledProcessorExchangeFactory.class);
+
+    public PooledProcessorExchangeFactory() {
+    }
+
+    public PooledProcessorExchangeFactory(Processor processor) {
+        super(processor);
+    }
+
+    @Override
+    public boolean isPooled() {
+        return true;
+    }
+
+    @Override
+    public ProcessorExchangeFactory newProcessorExchangeFactory(Processor processor) {
+        PooledProcessorExchangeFactory answer = new PooledProcessorExchangeFactory(processor);
+        answer.setStatisticsEnabled(statisticsEnabled);
+        answer.setCapacity(capacity);
+        answer.setCamelContext(camelContext);
+        return answer;
+    }
+
+    @Override
+    public Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
+        Exchange answer = pool.poll();
+        if (answer == null) {
+            // create a new exchange as there was no free from the pool
+            PooledExchange pe = new DefaultPooledExchange(exchange);
+            ExchangeHelper.copyResults(pe, exchange);
+            // do not reuse message id on copy
+            pe.getIn().setMessageId(null);
+            // do not share the unit of work
+            if (pe.getUnitOfWork() != null) {
+                pe.getUnitOfWork().reset();
+            }
+            if (handover) {
+                // Need to hand over the completion for async invocation
+                pe.adapt(ExtendedExchange.class).handoverCompletions(exchange);
+            }
+            // set a correlation id so we can track back the original exchange
+            pe.setProperty(ExchangePropertyKey.CORRELATION_ID, exchange.getExchangeId());
+            if (statisticsEnabled) {
+                statistics.created.increment();
+            }
+            answer = pe;
+        } else {
+            if (statisticsEnabled) {
+                statistics.acquired.increment();
+            }
+            // reset exchange for reuse
+            PooledExchange ee = (PooledExchange) answer;
+            ee.reset(System.currentTimeMillis());
+        }
+        return answer;
+    }
+
+    @Override
+    public boolean release(Exchange exchange) {
+        try {
+            // done exchange before returning back to pool
+            PooledExchange ee = (PooledExchange) exchange;
+            ee.done(true);
+
+            // only release back in pool if reset was success
+            boolean inserted = pool.offer(exchange);
+
+            if (statisticsEnabled) {
+                if (inserted) {
+                    statistics.released.increment();
+                } else {
+                    statistics.discarded.increment();
+                }
+            }
+            return inserted;
+        } catch (Exception e) {
+            if (statisticsEnabled) {
+                statistics.discarded.increment();
+            }
+            LOG.debug("Error resetting exchange: {}. This exchange is discarded.", exchange);
+            return false;
+        }
+    }
+
+}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 8cf889b..fff7cb9 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -102,10 +102,10 @@ public class WireTapProcessor extends AsyncProcessorSupport
             @Override
             public void done(boolean doneSync) {
                 taskCount.decrement();
-                taskFactory.release(WireTapTask.this);
                 if (processorExchangeFactory != null) {
                     processorExchangeFactory.release(exchange);
                 }
+                taskFactory.release(WireTapTask.this);
             }
         };
 
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
index 4af36cf..f5e1038 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
@@ -32,7 +32,9 @@ import org.apache.camel.health.HealthCheckRegistry;
 import org.apache.camel.health.HealthCheckRepository;
 import org.apache.camel.impl.debugger.BacklogTracer;
 import org.apache.camel.impl.engine.PooledExchangeFactory;
+import org.apache.camel.impl.engine.PooledProcessorExchangeFactory;
 import org.apache.camel.impl.engine.PrototypeExchangeFactory;
+import org.apache.camel.impl.engine.PrototypeProcessorExchangeFactory;
 import org.apache.camel.model.Model;
 import org.apache.camel.model.ModelCamelContext;
 import org.apache.camel.model.ModelLifecycleStrategy;
@@ -129,11 +131,15 @@ public final class DefaultConfigurationConfigurer {
 
         if ("pooled".equals(config.getExchangeFactory())) {
             ecc.setExchangeFactory(new PooledExchangeFactory());
+            ecc.setProcessorExchangeFactory(new PooledProcessorExchangeFactory());
         } else if ("prototype".equals(config.getExchangeFactory())) {
             ecc.setExchangeFactory(new PrototypeExchangeFactory());
+            ecc.setProcessorExchangeFactory(new PrototypeProcessorExchangeFactory());
         }
         ecc.getExchangeFactory().setCapacity(config.getExchangeFactoryCapacity());
+        ecc.getProcessorExchangeFactory().setCapacity(config.getExchangeFactoryCapacity());
         ecc.getExchangeFactory().setStatisticsEnabled(config.isExchangeFactoryStatisticsEnabled());
+        ecc.getProcessorExchangeFactory().setStatisticsEnabled(config.isExchangeFactoryStatisticsEnabled());
 
         if (!config.isJmxEnabled()) {
             camelContext.disableJMX();

[camel] 05/05: CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ab47d9bc40139d57aa8de833d01075d63b8d13d6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 5 18:35:43 2021 +0200

    CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP
---
 .../org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java    | 2 +-
 .../org/apache/camel/support/DefaultEndpointUtilizationStatistics.java  | 1 -
 2 files changed, 1 insertion(+), 2 deletions(-)

diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
index 697e02f..ab86b3d 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
@@ -96,7 +96,7 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa
         Exchange answer = pool.poll();
         if (answer == null) {
             // create a new exchange as there was no free from the pool
-            answer = super.create(fromEndpoint, exchangePattern);
+            answer = new DefaultPooledExchange(fromEndpoint, exchangePattern);
             if (statisticsEnabled) {
                 statistics.created.increment();
             }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpointUtilizationStatistics.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpointUtilizationStatistics.java
index ae024ec..6f83953 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpointUtilizationStatistics.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpointUtilizationStatistics.java
@@ -18,7 +18,6 @@ package org.apache.camel.support;
 
 import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 

[camel] 03/05: CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 54c5f2c3242f0d4c7d9e61bc4e911d9fbad556f7
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 5 17:36:34 2021 +0200

    CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP
---
 .../apache/camel/spi/ProcessorExchangeFactory.java |  9 ++++++++-
 .../engine/PooledProcessorExchangeFactory.java     | 22 ++++++++++++++++++++++
 .../engine/PrototypeProcessorExchangeFactory.java  |  8 ++++++++
 .../apache/camel/processor/WireTapProcessor.java   | 13 ++++++-------
 4 files changed, 44 insertions(+), 8 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
index 77142d4..c48ad15 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.spi;
 
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.NonManagedService;
 import org.apache.camel.Processor;
 
@@ -53,7 +55,7 @@ public interface ProcessorExchangeFactory extends PooledObjectFactory<Exchange>,
     ProcessorExchangeFactory newProcessorExchangeFactory(Processor processor);
 
     /**
-     * Creates a copy of the given {@link Exchange}
+     * Gets a copy of the given {@link Exchange}
      *
      * @param exchange original copy of the exchange
      * @param handover whether the on completion callbacks should be handed over to the new copy.
@@ -61,6 +63,11 @@ public interface ProcessorExchangeFactory extends PooledObjectFactory<Exchange>,
     Exchange createCorrelatedCopy(Exchange exchange, boolean handover);
 
     /**
+     * Gets a new {@link Exchange}
+     */
+    Exchange create(Endpoint fromEndpoint, ExchangePattern exchangePattern);
+
+    /**
      * Releases the exchange back into the pool
      *
      * @param  exchange the exchange
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
index d3c37a6..697e02f 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.impl.engine;
 
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.ExtendedExchange;
 import org.apache.camel.PooledExchange;
@@ -90,6 +92,26 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa
     }
 
     @Override
+    public Exchange create(Endpoint fromEndpoint, ExchangePattern exchangePattern) {
+        Exchange answer = pool.poll();
+        if (answer == null) {
+            // create a new exchange as there was no free from the pool
+            answer = super.create(fromEndpoint, exchangePattern);
+            if (statisticsEnabled) {
+                statistics.created.increment();
+            }
+        } else {
+            if (statisticsEnabled) {
+                statistics.acquired.increment();
+            }
+            // reset exchange for reuse
+            PooledExchange ee = (PooledExchange) answer;
+            ee.reset(System.currentTimeMillis());
+        }
+        return answer;
+    }
+
+    @Override
     public boolean release(Exchange exchange) {
         try {
             // done exchange before returning back to pool
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
index 0b10211..e4f4d18 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
@@ -16,9 +16,12 @@
  */
 package org.apache.camel.impl.engine;
 
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.ProcessorExchangeFactory;
+import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.PooledObjectFactorySupport;
 import org.slf4j.Logger;
@@ -83,6 +86,11 @@ public class PrototypeProcessorExchangeFactory extends PooledObjectFactorySuppor
     }
 
     @Override
+    public Exchange create(Endpoint fromEndpoint, ExchangePattern exchangePattern) {
+        return new DefaultExchange(fromEndpoint, exchangePattern);
+    }
+
+    @Override
     public Exchange acquire() {
         throw new UnsupportedOperationException("Not in use");
     }
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index fff7cb9..f3b90d4 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -287,8 +287,7 @@ public class WireTapProcessor extends AsyncProcessorSupport
     }
 
     private Exchange configureNewExchange(Exchange exchange) {
-        // no copy so lets just create a new exchange always
-        return new DefaultExchange(exchange.getFromEndpoint(), ExchangePattern.InOnly);
+        return processorExchangeFactory.create(exchange.getFromEndpoint(), ExchangePattern.InOnly);
     }
 
     public List<Processor> getNewExchangeProcessors() {
@@ -352,11 +351,11 @@ public class WireTapProcessor extends AsyncProcessorSupport
 
     @Override
     protected void doBuild() throws Exception {
-        if (copy) {
-            // create a per processor exchange factory
-            this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class)
-                    .getProcessorExchangeFactory().newProcessorExchangeFactory(this);
-        }
+        // create a per processor exchange factory
+        this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class)
+                .getProcessorExchangeFactory().newProcessorExchangeFactory(this);
+        this.processorExchangeFactory.setRouteId(getRouteId());
+        this.processorExchangeFactory.setId(getId());
 
         boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
         if (pooled) {

[camel] 01/05: CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 59c8d801a254879e9b72745bc16933023a5682c6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 5 16:03:59 2021 +0200

    CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP
---
 .../org/apache/camel/ExtendedCamelContext.java     |  11 ++
 .../java/org/apache/camel/spi/ExchangeFactory.java |   3 +
 .../apache/camel/spi/ProcessorExchangeFactory.java |  71 +++++++++++
 .../camel/impl/engine/AbstractCamelContext.java    |  24 ++++
 .../engine/PrototypeProcessorExchangeFactory.java  | 136 +++++++++++++++++++++
 .../camel/impl/engine/SimpleCamelContext.java      |  12 ++
 .../camel/impl/ExtendedCamelContextConfigurer.java |   6 +
 .../camel/impl/lw/LightweightCamelContext.java     |  11 ++
 .../impl/lw/LightweightRuntimeCamelContext.java    |  13 ++
 .../apache/camel/processor/WireTapProcessor.java   |  34 ++++--
 .../org/apache/camel/reifier/WireTapReifier.java   |   3 +-
 11 files changed, 309 insertions(+), 15 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
index 654cc52..b105f47 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
@@ -54,6 +54,7 @@ import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.NormalizedEndpointUri;
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.PackageScanResourceResolver;
+import org.apache.camel.spi.ProcessorExchangeFactory;
 import org.apache.camel.spi.ProcessorFactory;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.Registry;
@@ -237,6 +238,16 @@ public interface ExtendedCamelContext extends CamelContext {
     void setExchangeFactoryManager(ExchangeFactoryManager exchangeFactoryManager);
 
     /**
+     * Gets the processor exchange factory to use.
+     */
+    ProcessorExchangeFactory getProcessorExchangeFactory();
+
+    /**
+     * Sets a custom processor exchange factory to use.
+     */
+    void setProcessorExchangeFactory(ProcessorExchangeFactory processorExchangeFactory);
+
+    /**
      * Returns the bean post processor used to do any bean customization.
      *
      * @return the bean post processor.
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index 75b9d21..83a5e04 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -33,6 +33,9 @@ import org.apache.camel.NonManagedService;
  * <p/>
  * The factory is pluggable which allows to use different strategies. The default factory will create a new
  * {@link Exchange} instance, and the pooled factory will pool and reuse exchanges.
+ *
+ * @see ProcessorExchangeFactory
+ * @see org.apache.camel.PooledExchange
  */
 public interface ExchangeFactory extends PooledObjectFactory<Exchange>, NonManagedService, RouteIdAware {
 
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
new file mode 100644
index 0000000..77142d4
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.NonManagedService;
+import org.apache.camel.Processor;
+
+/**
+ * Factory used by {@link org.apache.camel.Processor} (EIPs) when they create copies of the processed {@link Exchange}.
+ * <p/>
+ * Some EIPs like WireTap, Multicast, Split etc creates copies of the processed exchange which they use as sub
+ * exchanges. This factory allows to use exchange pooling.
+ *
+ * The factory is pluggable which allows to use different strategies. The default factory will create a new
+ * {@link Exchange} instance, and the pooled factory will pool and reuse exchanges.
+ *
+ * @see ExchangeFactory
+ * @see org.apache.camel.PooledExchange
+ */
+public interface ProcessorExchangeFactory extends PooledObjectFactory<Exchange>, NonManagedService, RouteIdAware, IdAware {
+
+    /**
+     * Service factory key.
+     */
+    String FACTORY = "processor-exchange-factory";
+
+    /**
+     * The processor using this factory.
+     */
+    Processor getProcessor();
+
+    /**
+     * Creates a new {@link ProcessorExchangeFactory} that is private for the given consumer.
+     *
+     * @param  processor the processor that will use the created {@link ProcessorExchangeFactory}
+     * @return           the created factory.
+     */
+    ProcessorExchangeFactory newProcessorExchangeFactory(Processor processor);
+
+    /**
+     * Creates a copy of the given {@link Exchange}
+     *
+     * @param exchange original copy of the exchange
+     * @param handover whether the on completion callbacks should be handed over to the new copy.
+     */
+    Exchange createCorrelatedCopy(Exchange exchange, boolean handover);
+
+    /**
+     * Releases the exchange back into the pool
+     *
+     * @param  exchange the exchange
+     * @return          true if released into the pool, or false if something went wrong and the exchange was discarded
+     */
+    boolean release(Exchange exchange);
+
+}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index afa7832..c1d8fd1 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -130,6 +130,7 @@ import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.NormalizedEndpointUri;
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.PackageScanResourceResolver;
+import org.apache.camel.spi.ProcessorExchangeFactory;
 import org.apache.camel.spi.ProcessorFactory;
 import org.apache.camel.spi.PropertiesComponent;
 import org.apache.camel.spi.ReactiveExecutor;
@@ -271,6 +272,7 @@ public abstract class AbstractCamelContext extends BaseService
     private volatile CamelContextNameStrategy nameStrategy;
     private volatile ExchangeFactoryManager exchangeFactoryManager;
     private volatile ExchangeFactory exchangeFactory;
+    private volatile ProcessorExchangeFactory processorExchangeFactory;
     private volatile ReactiveExecutor reactiveExecutor;
     private volatile ManagementNameStrategy managementNameStrategy;
     private volatile Registry registry;
@@ -3701,6 +3703,7 @@ public abstract class AbstractCamelContext extends BaseService
         asyncProcessorAwaitManager = null;
         exchangeFactory = null;
         exchangeFactoryManager = null;
+        processorExchangeFactory = null;
         registry = null;
     }
 
@@ -4715,6 +4718,25 @@ public abstract class AbstractCamelContext extends BaseService
     }
 
     @Override
+    public ProcessorExchangeFactory getProcessorExchangeFactory() {
+        if (processorExchangeFactory == null) {
+            synchronized (lock) {
+                if (processorExchangeFactory == null) {
+                    setProcessorExchangeFactory(createProcessorExchangeFactory());
+                }
+            }
+        }
+        return processorExchangeFactory;
+    }
+
+    @Override
+    public void setProcessorExchangeFactory(ProcessorExchangeFactory processorExchangeFactory) {
+        // automatic inject camel context
+        processorExchangeFactory.setCamelContext(this);
+        this.processorExchangeFactory = processorExchangeFactory;
+    }
+
+    @Override
     public ReactiveExecutor getReactiveExecutor() {
         if (reactiveExecutor == null) {
             synchronized (lock) {
@@ -4810,6 +4832,8 @@ public abstract class AbstractCamelContext extends BaseService
 
     protected abstract ExchangeFactoryManager createExchangeFactoryManager();
 
+    protected abstract ProcessorExchangeFactory createProcessorExchangeFactory();
+
     protected abstract HealthCheckRegistry createHealthCheckRegistry();
 
     protected abstract ReactiveExecutor createReactiveExecutor();
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
new file mode 100644
index 0000000..0b10211
--- /dev/null
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.ProcessorExchangeFactory;
+import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.PooledObjectFactorySupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link org.apache.camel.spi.ProcessorExchangeFactory} that creates a new {@link Exchange} instance.
+ */
+public class PrototypeProcessorExchangeFactory extends PooledObjectFactorySupport<Exchange>
+        implements ProcessorExchangeFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PrototypeProcessorExchangeFactory.class);
+
+    final Processor processor;
+    String routeId;
+    String id;
+
+    public PrototypeProcessorExchangeFactory() {
+        this.processor = null;
+    }
+
+    public PrototypeProcessorExchangeFactory(Processor processor) {
+        this.processor = processor;
+    }
+
+    @Override
+    public String getRouteId() {
+        return routeId;
+    }
+
+    @Override
+    public void setRouteId(String routeId) {
+        this.routeId = routeId;
+    }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public Processor getProcessor() {
+        return processor;
+    }
+
+    @Override
+    public ProcessorExchangeFactory newProcessorExchangeFactory(Processor processor) {
+        PrototypeProcessorExchangeFactory answer = new PrototypeProcessorExchangeFactory(processor);
+        answer.setStatisticsEnabled(statisticsEnabled);
+        answer.setCapacity(capacity);
+        answer.setCamelContext(camelContext);
+        return answer;
+    }
+
+    @Override
+    public Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
+        return ExchangeHelper.createCorrelatedCopy(exchange, handover);
+    }
+
+    @Override
+    public Exchange acquire() {
+        throw new UnsupportedOperationException("Not in use");
+    }
+
+    @Override
+    public boolean release(Exchange exchange) {
+        if (statisticsEnabled) {
+            statistics.released.increment();
+        }
+        return true;
+    }
+
+    @Override
+    public boolean isPooled() {
+        return false;
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        logUsageSummary(LOG, "PrototypeProcessorExchangeFactory", 0);
+    }
+
+    void logUsageSummary(Logger log, String name, int pooled) {
+        if (statisticsEnabled && processor != null) {
+            // only log if there is any usage
+            long created = statistics.getCreatedCounter();
+            long acquired = statistics.getAcquiredCounter();
+            long released = statistics.getReleasedCounter();
+            long discarded = statistics.getDiscardedCounter();
+            boolean shouldLog = pooled > 0 || created > 0 || acquired > 0 || released > 0 || discarded > 0;
+            if (shouldLog) {
+                String rid = getRouteId();
+                String pid = getId();
+
+                // are there any leaks?
+                boolean leak = created + acquired > released + discarded;
+                if (leak) {
+                    long leaks = (created + acquired) - (released + discarded);
+                    log.info(
+                            "{} {} ({}) usage (leaks detected: {}) [pooled: {}, created: {}, acquired: {} released: {}, discarded: {}]",
+                            name, rid, pid, leaks, pooled, created, acquired, released, discarded);
+                } else {
+                    log.info("{} {} ({}) usage [pooled: {}, created: {}, acquired: {} released: {}, discarded: {}]",
+                            name, rid, pid, pooled, created, acquired, released, discarded);
+                }
+            }
+        }
+    }
+
+}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
index 3c46c48..254fc2d 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
@@ -59,6 +59,7 @@ import org.apache.camel.spi.ModelToXMLDumper;
 import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.PackageScanResourceResolver;
+import org.apache.camel.spi.ProcessorExchangeFactory;
 import org.apache.camel.spi.ProcessorFactory;
 import org.apache.camel.spi.PropertiesComponent;
 import org.apache.camel.spi.ReactiveExecutor;
@@ -574,6 +575,17 @@ public class SimpleCamelContext extends AbstractCamelContext {
     }
 
     @Override
+    protected ProcessorExchangeFactory createProcessorExchangeFactory() {
+        Optional<ProcessorExchangeFactory> result = ResolverHelper.resolveService(
+                getCamelContextReference(),
+                getBootstrapFactoryFinder(),
+                ProcessorExchangeFactory.FACTORY,
+                ProcessorExchangeFactory.class);
+
+        return result.orElseGet(PrototypeProcessorExchangeFactory::new);
+    }
+
+    @Override
     protected ReactiveExecutor createReactiveExecutor() {
         Optional<ReactiveExecutor> result = ResolverHelper.resolveService(
                 getCamelContextReference(),
diff --git a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
index d06c43d..9d5d73a 100644
--- a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
+++ b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
@@ -121,6 +121,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
         case "PackageScanClassResolver": target.setPackageScanClassResolver(property(camelContext, org.apache.camel.spi.PackageScanClassResolver.class, value)); return true;
         case "packagescanresourceresolver":
         case "PackageScanResourceResolver": target.setPackageScanResourceResolver(property(camelContext, org.apache.camel.spi.PackageScanResourceResolver.class, value)); return true;
+        case "processorexchangefactory":
+        case "ProcessorExchangeFactory": target.setProcessorExchangeFactory(property(camelContext, org.apache.camel.spi.ProcessorExchangeFactory.class, value)); return true;
         case "processorfactory":
         case "ProcessorFactory": target.setProcessorFactory(property(camelContext, org.apache.camel.spi.ProcessorFactory.class, value)); return true;
         case "propertiescomponent":
@@ -294,6 +296,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
         case "PackageScanClassResolver": return org.apache.camel.spi.PackageScanClassResolver.class;
         case "packagescanresourceresolver":
         case "PackageScanResourceResolver": return org.apache.camel.spi.PackageScanResourceResolver.class;
+        case "processorexchangefactory":
+        case "ProcessorExchangeFactory": return org.apache.camel.spi.ProcessorExchangeFactory.class;
         case "processorfactory":
         case "ProcessorFactory": return org.apache.camel.spi.ProcessorFactory.class;
         case "propertiescomponent":
@@ -468,6 +472,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
         case "PackageScanClassResolver": return target.getPackageScanClassResolver();
         case "packagescanresourceresolver":
         case "PackageScanResourceResolver": return target.getPackageScanResourceResolver();
+        case "processorexchangefactory":
+        case "ProcessorExchangeFactory": return target.getProcessorExchangeFactory();
         case "processorfactory":
         case "ProcessorFactory": return target.getProcessorFactory();
         case "propertiescomponent":
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
index 6101ee5..626d4cf 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
@@ -114,6 +114,7 @@ import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.NormalizedEndpointUri;
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.PackageScanResourceResolver;
+import org.apache.camel.spi.ProcessorExchangeFactory;
 import org.apache.camel.spi.ProcessorFactory;
 import org.apache.camel.spi.PropertiesComponent;
 import org.apache.camel.spi.ReactiveExecutor;
@@ -1469,6 +1470,16 @@ public class LightweightCamelContext implements ExtendedCamelContext, CatalogCam
     }
 
     @Override
+    public ProcessorExchangeFactory getProcessorExchangeFactory() {
+        return getExtendedCamelContext().getProcessorExchangeFactory();
+    }
+
+    @Override
+    public void setProcessorExchangeFactory(ProcessorExchangeFactory processorExchangeFactory) {
+        getExtendedCamelContext().setProcessorExchangeFactory(processorExchangeFactory);
+    }
+
+    @Override
     public ReactiveExecutor getReactiveExecutor() {
         return getExtendedCamelContext().getReactiveExecutor();
     }
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
index af98c56..d70e23d 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
@@ -110,6 +110,7 @@ import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.NormalizedEndpointUri;
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.PackageScanResourceResolver;
+import org.apache.camel.spi.ProcessorExchangeFactory;
 import org.apache.camel.spi.ProcessorFactory;
 import org.apache.camel.spi.PropertiesComponent;
 import org.apache.camel.spi.ReactiveExecutor;
@@ -171,6 +172,7 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
     private final HeadersMapFactory headersMapFactory;
     private final ExchangeFactory exchangeFactory;
     private final ExchangeFactoryManager exchangeFactoryManager;
+    private final ProcessorExchangeFactory processorExchangeFactory;
     private final ReactiveExecutor reactiveExecutor;
     private final AsyncProcessorAwaitManager asyncProcessorAwaitManager;
     private final ExecutorServiceManager executorServiceManager;
@@ -217,6 +219,7 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
         headersMapFactory = context.adapt(ExtendedCamelContext.class).getHeadersMapFactory();
         exchangeFactory = context.adapt(ExtendedCamelContext.class).getExchangeFactory();
         exchangeFactoryManager = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager();
+        processorExchangeFactory = context.adapt(ExtendedCamelContext.class).getProcessorExchangeFactory();
         reactiveExecutor = context.adapt(ExtendedCamelContext.class).getReactiveExecutor();
         asyncProcessorAwaitManager = context.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
         executorServiceManager = context.getExecutorServiceManager();
@@ -1578,6 +1581,16 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
     }
 
     @Override
+    public ProcessorExchangeFactory getProcessorExchangeFactory() {
+        return processorExchangeFactory;
+    }
+
+    @Override
+    public void setProcessorExchangeFactory(ProcessorExchangeFactory processorExchangeFactory) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public ReactiveExecutor getReactiveExecutor() {
         return reactiveExecutor;
     }
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index b6c7822..8cf889b 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -39,12 +39,12 @@ import org.apache.camel.StreamCache;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ProcessorExchangeFactory;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.DefaultExchange;
-import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -67,26 +67,28 @@ public class WireTapProcessor extends AsyncProcessorSupport
     private final Processor processor;
     private final AsyncProcessor asyncProcessor;
     private final ExchangePattern exchangePattern;
+    private final boolean copy;
     private final ExecutorService executorService;
     private volatile boolean shutdownExecutorService;
     private final LongAdder taskCount = new LongAdder();
+    private ProcessorExchangeFactory processorExchangeFactory;
     private PooledExchangeTaskFactory taskFactory;
 
     // expression or processor used for populating a new exchange to send
     // as opposed to traditional wiretap that sends a copy of the original exchange
     private Expression newExchangeExpression;
     private List<Processor> newExchangeProcessors;
-    private boolean copy;
     private Processor onPrepare;
 
     public WireTapProcessor(SendDynamicProcessor dynamicSendProcessor, Processor processor, String uri,
-                            ExchangePattern exchangePattern,
+                            ExchangePattern exchangePattern, boolean copy,
                             ExecutorService executorService, boolean shutdownExecutorService, boolean dynamicUri) {
         this.dynamicSendProcessor = dynamicSendProcessor;
         this.uri = uri;
         this.processor = processor;
         this.asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
         this.exchangePattern = exchangePattern;
+        this.copy = copy;
         ObjectHelper.notNull(executorService, "executorService");
         this.executorService = executorService;
         this.shutdownExecutorService = shutdownExecutorService;
@@ -101,6 +103,9 @@ public class WireTapProcessor extends AsyncProcessorSupport
             public void done(boolean doneSync) {
                 taskCount.decrement();
                 taskFactory.release(WireTapTask.this);
+                if (processorExchangeFactory != null) {
+                    processorExchangeFactory.release(exchange);
+                }
             }
         };
 
@@ -272,7 +277,7 @@ public class WireTapProcessor extends AsyncProcessorSupport
 
     private Exchange configureCopyExchange(Exchange exchange) {
         // must use a copy as we dont want it to cause side effects of the original exchange
-        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+        Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
         // set MEP to InOnly as this wire tap is a fire and forget
         copy.setPattern(ExchangePattern.InOnly);
         // remove STREAM_CACHE_UNIT_OF_WORK property because this wire tap will
@@ -282,6 +287,7 @@ public class WireTapProcessor extends AsyncProcessorSupport
     }
 
     private Exchange configureNewExchange(Exchange exchange) {
+        // no copy so lets just create a new exchange always
         return new DefaultExchange(exchange.getFromEndpoint(), ExchangePattern.InOnly);
     }
 
@@ -312,10 +318,6 @@ public class WireTapProcessor extends AsyncProcessorSupport
         return copy;
     }
 
-    public void setCopy(boolean copy) {
-        this.copy = copy;
-    }
-
     public Processor getOnPrepare() {
         return onPrepare;
     }
@@ -350,6 +352,12 @@ public class WireTapProcessor extends AsyncProcessorSupport
 
     @Override
     protected void doBuild() throws Exception {
+        if (copy) {
+            // create a per processor exchange factory
+            this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class)
+                    .getProcessorExchangeFactory().newProcessorExchangeFactory(this);
+        }
+
         boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
         if (pooled) {
             taskFactory = new PooledTaskFactory(getId()) {
@@ -370,27 +378,27 @@ public class WireTapProcessor extends AsyncProcessorSupport
         }
         LOG.trace("Using TaskFactory: {}", taskFactory);
 
-        ServiceHelper.buildService(taskFactory, processor);
+        ServiceHelper.buildService(processorExchangeFactory, taskFactory, processor);
     }
 
     @Override
     protected void doInit() throws Exception {
-        ServiceHelper.initService(taskFactory, processor);
+        ServiceHelper.initService(processorExchangeFactory, taskFactory, processor);
     }
 
     @Override
     protected void doStart() throws Exception {
-        ServiceHelper.startService(taskFactory, processor);
+        ServiceHelper.startService(processorExchangeFactory, taskFactory, processor);
     }
 
     @Override
     protected void doStop() throws Exception {
-        ServiceHelper.stopService(taskFactory, processor);
+        ServiceHelper.stopService(processorExchangeFactory, taskFactory, processor);
     }
 
     @Override
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(taskFactory, processor);
+        ServiceHelper.stopAndShutdownServices(processorExchangeFactory, taskFactory, processor);
         if (shutdownExecutorService) {
             getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
         }
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
index 3d0c214..23a7508 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
@@ -84,9 +84,8 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
 
         WireTapProcessor answer = new WireTapProcessor(
                 dynamicSendProcessor, target, uri,
-                parse(ExchangePattern.class, definition.getPattern()),
+                parse(ExchangePattern.class, definition.getPattern()), isCopy,
                 threadPool, shutdownThreadPool, dynamic);
-        answer.setCopy(isCopy);
         Processor newExchangeProcessor = definition.getNewExchangeProcessor();
         String ref = parseString(definition.getNewExchangeProcessorRef());
         if (ref != null) {