You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/05 16:47:09 UTC
[camel] 01/05: CAMEL-16451: camel-core - ExchangePooling for EIPs.
Wiretap EIP
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 59c8d801a254879e9b72745bc16933023a5682c6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 5 16:03:59 2021 +0200
CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP
---
.../org/apache/camel/ExtendedCamelContext.java | 11 ++
.../java/org/apache/camel/spi/ExchangeFactory.java | 3 +
.../apache/camel/spi/ProcessorExchangeFactory.java | 71 +++++++++++
.../camel/impl/engine/AbstractCamelContext.java | 24 ++++
.../engine/PrototypeProcessorExchangeFactory.java | 136 +++++++++++++++++++++
.../camel/impl/engine/SimpleCamelContext.java | 12 ++
.../camel/impl/ExtendedCamelContextConfigurer.java | 6 +
.../camel/impl/lw/LightweightCamelContext.java | 11 ++
.../impl/lw/LightweightRuntimeCamelContext.java | 13 ++
.../apache/camel/processor/WireTapProcessor.java | 34 ++++--
.../org/apache/camel/reifier/WireTapReifier.java | 3 +-
11 files changed, 309 insertions(+), 15 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
index 654cc52..b105f47 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
@@ -54,6 +54,7 @@ import org.apache.camel.spi.NodeIdFactory;
import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.PackageScanClassResolver;
import org.apache.camel.spi.PackageScanResourceResolver;
+import org.apache.camel.spi.ProcessorExchangeFactory;
import org.apache.camel.spi.ProcessorFactory;
import org.apache.camel.spi.ReactiveExecutor;
import org.apache.camel.spi.Registry;
@@ -237,6 +238,16 @@ public interface ExtendedCamelContext extends CamelContext {
void setExchangeFactoryManager(ExchangeFactoryManager exchangeFactoryManager);
/**
+ * Gets the processor exchange factory to use.
+ */
+ ProcessorExchangeFactory getProcessorExchangeFactory();
+
+ /**
+ * Sets a custom processor exchange factory to use.
+ */
+ void setProcessorExchangeFactory(ProcessorExchangeFactory processorExchangeFactory);
+
+ /**
* Returns the bean post processor used to do any bean customization.
*
* @return the bean post processor.
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index 75b9d21..83a5e04 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -33,6 +33,9 @@ import org.apache.camel.NonManagedService;
* <p/>
* The factory is pluggable which allows to use different strategies. The default factory will create a new
* {@link Exchange} instance, and the pooled factory will pool and reuse exchanges.
+ *
+ * @see ProcessorExchangeFactory
+ * @see org.apache.camel.PooledExchange
*/
public interface ExchangeFactory extends PooledObjectFactory<Exchange>, NonManagedService, RouteIdAware {
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
new file mode 100644
index 0000000..77142d4
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.NonManagedService;
+import org.apache.camel.Processor;
+
+/**
+ * Factory used by {@link org.apache.camel.Processor} (EIPs) when they create copies of the processed {@link Exchange}.
+ * <p/>
+ * Some EIPs like WireTap, Multicast, Split etc creates copies of the processed exchange which they use as sub
+ * exchanges. This factory allows to use exchange pooling.
+ *
+ * The factory is pluggable which allows to use different strategies. The default factory will create a new
+ * {@link Exchange} instance, and the pooled factory will pool and reuse exchanges.
+ *
+ * @see ExchangeFactory
+ * @see org.apache.camel.PooledExchange
+ */
+public interface ProcessorExchangeFactory extends PooledObjectFactory<Exchange>, NonManagedService, RouteIdAware, IdAware {
+
+ /**
+ * Service factory key.
+ */
+ String FACTORY = "processor-exchange-factory";
+
+ /**
+ * The processor using this factory.
+ */
+ Processor getProcessor();
+
+ /**
+ * Creates a new {@link ProcessorExchangeFactory} that is private for the given consumer.
+ *
+ * @param processor the processor that will use the created {@link ProcessorExchangeFactory}
+ * @return the created factory.
+ */
+ ProcessorExchangeFactory newProcessorExchangeFactory(Processor processor);
+
+ /**
+ * Creates a copy of the given {@link Exchange}
+ *
+ * @param exchange original copy of the exchange
+ * @param handover whether the on completion callbacks should be handed over to the new copy.
+ */
+ Exchange createCorrelatedCopy(Exchange exchange, boolean handover);
+
+ /**
+ * Releases the exchange back into the pool
+ *
+ * @param exchange the exchange
+ * @return true if released into the pool, or false if something went wrong and the exchange was discarded
+ */
+ boolean release(Exchange exchange);
+
+}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index afa7832..c1d8fd1 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -130,6 +130,7 @@ import org.apache.camel.spi.NodeIdFactory;
import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.PackageScanClassResolver;
import org.apache.camel.spi.PackageScanResourceResolver;
+import org.apache.camel.spi.ProcessorExchangeFactory;
import org.apache.camel.spi.ProcessorFactory;
import org.apache.camel.spi.PropertiesComponent;
import org.apache.camel.spi.ReactiveExecutor;
@@ -271,6 +272,7 @@ public abstract class AbstractCamelContext extends BaseService
private volatile CamelContextNameStrategy nameStrategy;
private volatile ExchangeFactoryManager exchangeFactoryManager;
private volatile ExchangeFactory exchangeFactory;
+ private volatile ProcessorExchangeFactory processorExchangeFactory;
private volatile ReactiveExecutor reactiveExecutor;
private volatile ManagementNameStrategy managementNameStrategy;
private volatile Registry registry;
@@ -3701,6 +3703,7 @@ public abstract class AbstractCamelContext extends BaseService
asyncProcessorAwaitManager = null;
exchangeFactory = null;
exchangeFactoryManager = null;
+ processorExchangeFactory = null;
registry = null;
}
@@ -4715,6 +4718,25 @@ public abstract class AbstractCamelContext extends BaseService
}
@Override
+ public ProcessorExchangeFactory getProcessorExchangeFactory() {
+ if (processorExchangeFactory == null) {
+ synchronized (lock) {
+ if (processorExchangeFactory == null) {
+ setProcessorExchangeFactory(createProcessorExchangeFactory());
+ }
+ }
+ }
+ return processorExchangeFactory;
+ }
+
+ @Override
+ public void setProcessorExchangeFactory(ProcessorExchangeFactory processorExchangeFactory) {
+ // automatic inject camel context
+ processorExchangeFactory.setCamelContext(this);
+ this.processorExchangeFactory = processorExchangeFactory;
+ }
+
+ @Override
public ReactiveExecutor getReactiveExecutor() {
if (reactiveExecutor == null) {
synchronized (lock) {
@@ -4810,6 +4832,8 @@ public abstract class AbstractCamelContext extends BaseService
protected abstract ExchangeFactoryManager createExchangeFactoryManager();
+ protected abstract ProcessorExchangeFactory createProcessorExchangeFactory();
+
protected abstract HealthCheckRegistry createHealthCheckRegistry();
protected abstract ReactiveExecutor createReactiveExecutor();
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
new file mode 100644
index 0000000..0b10211
--- /dev/null
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.ProcessorExchangeFactory;
+import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.PooledObjectFactorySupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link org.apache.camel.spi.ProcessorExchangeFactory} that creates a new {@link Exchange} instance.
+ */
+public class PrototypeProcessorExchangeFactory extends PooledObjectFactorySupport<Exchange>
+ implements ProcessorExchangeFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PrototypeProcessorExchangeFactory.class);
+
+ final Processor processor;
+ String routeId;
+ String id;
+
+ public PrototypeProcessorExchangeFactory() {
+ this.processor = null;
+ }
+
+ public PrototypeProcessorExchangeFactory(Processor processor) {
+ this.processor = processor;
+ }
+
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public Processor getProcessor() {
+ return processor;
+ }
+
+ @Override
+ public ProcessorExchangeFactory newProcessorExchangeFactory(Processor processor) {
+ PrototypeProcessorExchangeFactory answer = new PrototypeProcessorExchangeFactory(processor);
+ answer.setStatisticsEnabled(statisticsEnabled);
+ answer.setCapacity(capacity);
+ answer.setCamelContext(camelContext);
+ return answer;
+ }
+
+ @Override
+ public Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
+ return ExchangeHelper.createCorrelatedCopy(exchange, handover);
+ }
+
+ @Override
+ public Exchange acquire() {
+ throw new UnsupportedOperationException("Not in use");
+ }
+
+ @Override
+ public boolean release(Exchange exchange) {
+ if (statisticsEnabled) {
+ statistics.released.increment();
+ }
+ return true;
+ }
+
+ @Override
+ public boolean isPooled() {
+ return false;
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ logUsageSummary(LOG, "PrototypeProcessorExchangeFactory", 0);
+ }
+
+ void logUsageSummary(Logger log, String name, int pooled) {
+ if (statisticsEnabled && processor != null) {
+ // only log if there is any usage
+ long created = statistics.getCreatedCounter();
+ long acquired = statistics.getAcquiredCounter();
+ long released = statistics.getReleasedCounter();
+ long discarded = statistics.getDiscardedCounter();
+ boolean shouldLog = pooled > 0 || created > 0 || acquired > 0 || released > 0 || discarded > 0;
+ if (shouldLog) {
+ String rid = getRouteId();
+ String pid = getId();
+
+ // are there any leaks?
+ boolean leak = created + acquired > released + discarded;
+ if (leak) {
+ long leaks = (created + acquired) - (released + discarded);
+ log.info(
+ "{} {} ({}) usage (leaks detected: {}) [pooled: {}, created: {}, acquired: {} released: {}, discarded: {}]",
+ name, rid, pid, leaks, pooled, created, acquired, released, discarded);
+ } else {
+ log.info("{} {} ({}) usage [pooled: {}, created: {}, acquired: {} released: {}, discarded: {}]",
+ name, rid, pid, pooled, created, acquired, released, discarded);
+ }
+ }
+ }
+ }
+
+}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
index 3c46c48..254fc2d 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
@@ -59,6 +59,7 @@ import org.apache.camel.spi.ModelToXMLDumper;
import org.apache.camel.spi.NodeIdFactory;
import org.apache.camel.spi.PackageScanClassResolver;
import org.apache.camel.spi.PackageScanResourceResolver;
+import org.apache.camel.spi.ProcessorExchangeFactory;
import org.apache.camel.spi.ProcessorFactory;
import org.apache.camel.spi.PropertiesComponent;
import org.apache.camel.spi.ReactiveExecutor;
@@ -574,6 +575,17 @@ public class SimpleCamelContext extends AbstractCamelContext {
}
@Override
+ protected ProcessorExchangeFactory createProcessorExchangeFactory() {
+ Optional<ProcessorExchangeFactory> result = ResolverHelper.resolveService(
+ getCamelContextReference(),
+ getBootstrapFactoryFinder(),
+ ProcessorExchangeFactory.FACTORY,
+ ProcessorExchangeFactory.class);
+
+ return result.orElseGet(PrototypeProcessorExchangeFactory::new);
+ }
+
+ @Override
protected ReactiveExecutor createReactiveExecutor() {
Optional<ReactiveExecutor> result = ResolverHelper.resolveService(
getCamelContextReference(),
diff --git a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
index d06c43d..9d5d73a 100644
--- a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
+++ b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
@@ -121,6 +121,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
case "PackageScanClassResolver": target.setPackageScanClassResolver(property(camelContext, org.apache.camel.spi.PackageScanClassResolver.class, value)); return true;
case "packagescanresourceresolver":
case "PackageScanResourceResolver": target.setPackageScanResourceResolver(property(camelContext, org.apache.camel.spi.PackageScanResourceResolver.class, value)); return true;
+ case "processorexchangefactory":
+ case "ProcessorExchangeFactory": target.setProcessorExchangeFactory(property(camelContext, org.apache.camel.spi.ProcessorExchangeFactory.class, value)); return true;
case "processorfactory":
case "ProcessorFactory": target.setProcessorFactory(property(camelContext, org.apache.camel.spi.ProcessorFactory.class, value)); return true;
case "propertiescomponent":
@@ -294,6 +296,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
case "PackageScanClassResolver": return org.apache.camel.spi.PackageScanClassResolver.class;
case "packagescanresourceresolver":
case "PackageScanResourceResolver": return org.apache.camel.spi.PackageScanResourceResolver.class;
+ case "processorexchangefactory":
+ case "ProcessorExchangeFactory": return org.apache.camel.spi.ProcessorExchangeFactory.class;
case "processorfactory":
case "ProcessorFactory": return org.apache.camel.spi.ProcessorFactory.class;
case "propertiescomponent":
@@ -468,6 +472,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
case "PackageScanClassResolver": return target.getPackageScanClassResolver();
case "packagescanresourceresolver":
case "PackageScanResourceResolver": return target.getPackageScanResourceResolver();
+ case "processorexchangefactory":
+ case "ProcessorExchangeFactory": return target.getProcessorExchangeFactory();
case "processorfactory":
case "ProcessorFactory": return target.getProcessorFactory();
case "propertiescomponent":
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
index 6101ee5..626d4cf 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
@@ -114,6 +114,7 @@ import org.apache.camel.spi.NodeIdFactory;
import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.PackageScanClassResolver;
import org.apache.camel.spi.PackageScanResourceResolver;
+import org.apache.camel.spi.ProcessorExchangeFactory;
import org.apache.camel.spi.ProcessorFactory;
import org.apache.camel.spi.PropertiesComponent;
import org.apache.camel.spi.ReactiveExecutor;
@@ -1469,6 +1470,16 @@ public class LightweightCamelContext implements ExtendedCamelContext, CatalogCam
}
@Override
+ public ProcessorExchangeFactory getProcessorExchangeFactory() {
+ return getExtendedCamelContext().getProcessorExchangeFactory();
+ }
+
+ @Override
+ public void setProcessorExchangeFactory(ProcessorExchangeFactory processorExchangeFactory) {
+ getExtendedCamelContext().setProcessorExchangeFactory(processorExchangeFactory);
+ }
+
+ @Override
public ReactiveExecutor getReactiveExecutor() {
return getExtendedCamelContext().getReactiveExecutor();
}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
index af98c56..d70e23d 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
@@ -110,6 +110,7 @@ import org.apache.camel.spi.NodeIdFactory;
import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.PackageScanClassResolver;
import org.apache.camel.spi.PackageScanResourceResolver;
+import org.apache.camel.spi.ProcessorExchangeFactory;
import org.apache.camel.spi.ProcessorFactory;
import org.apache.camel.spi.PropertiesComponent;
import org.apache.camel.spi.ReactiveExecutor;
@@ -171,6 +172,7 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
private final HeadersMapFactory headersMapFactory;
private final ExchangeFactory exchangeFactory;
private final ExchangeFactoryManager exchangeFactoryManager;
+ private final ProcessorExchangeFactory processorExchangeFactory;
private final ReactiveExecutor reactiveExecutor;
private final AsyncProcessorAwaitManager asyncProcessorAwaitManager;
private final ExecutorServiceManager executorServiceManager;
@@ -217,6 +219,7 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
headersMapFactory = context.adapt(ExtendedCamelContext.class).getHeadersMapFactory();
exchangeFactory = context.adapt(ExtendedCamelContext.class).getExchangeFactory();
exchangeFactoryManager = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager();
+ processorExchangeFactory = context.adapt(ExtendedCamelContext.class).getProcessorExchangeFactory();
reactiveExecutor = context.adapt(ExtendedCamelContext.class).getReactiveExecutor();
asyncProcessorAwaitManager = context.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
executorServiceManager = context.getExecutorServiceManager();
@@ -1578,6 +1581,16 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat
}
@Override
+ public ProcessorExchangeFactory getProcessorExchangeFactory() {
+ return processorExchangeFactory;
+ }
+
+ @Override
+ public void setProcessorExchangeFactory(ProcessorExchangeFactory processorExchangeFactory) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public ReactiveExecutor getReactiveExecutor() {
return reactiveExecutor;
}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index b6c7822..8cf889b 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -39,12 +39,12 @@ import org.apache.camel.StreamCache;
import org.apache.camel.Traceable;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ProcessorExchangeFactory;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.DefaultExchange;
-import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
@@ -67,26 +67,28 @@ public class WireTapProcessor extends AsyncProcessorSupport
private final Processor processor;
private final AsyncProcessor asyncProcessor;
private final ExchangePattern exchangePattern;
+ private final boolean copy;
private final ExecutorService executorService;
private volatile boolean shutdownExecutorService;
private final LongAdder taskCount = new LongAdder();
+ private ProcessorExchangeFactory processorExchangeFactory;
private PooledExchangeTaskFactory taskFactory;
// expression or processor used for populating a new exchange to send
// as opposed to traditional wiretap that sends a copy of the original exchange
private Expression newExchangeExpression;
private List<Processor> newExchangeProcessors;
- private boolean copy;
private Processor onPrepare;
public WireTapProcessor(SendDynamicProcessor dynamicSendProcessor, Processor processor, String uri,
- ExchangePattern exchangePattern,
+ ExchangePattern exchangePattern, boolean copy,
ExecutorService executorService, boolean shutdownExecutorService, boolean dynamicUri) {
this.dynamicSendProcessor = dynamicSendProcessor;
this.uri = uri;
this.processor = processor;
this.asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
this.exchangePattern = exchangePattern;
+ this.copy = copy;
ObjectHelper.notNull(executorService, "executorService");
this.executorService = executorService;
this.shutdownExecutorService = shutdownExecutorService;
@@ -101,6 +103,9 @@ public class WireTapProcessor extends AsyncProcessorSupport
public void done(boolean doneSync) {
taskCount.decrement();
taskFactory.release(WireTapTask.this);
+ if (processorExchangeFactory != null) {
+ processorExchangeFactory.release(exchange);
+ }
}
};
@@ -272,7 +277,7 @@ public class WireTapProcessor extends AsyncProcessorSupport
private Exchange configureCopyExchange(Exchange exchange) {
// must use a copy as we dont want it to cause side effects of the original exchange
- Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+ Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
// set MEP to InOnly as this wire tap is a fire and forget
copy.setPattern(ExchangePattern.InOnly);
// remove STREAM_CACHE_UNIT_OF_WORK property because this wire tap will
@@ -282,6 +287,7 @@ public class WireTapProcessor extends AsyncProcessorSupport
}
private Exchange configureNewExchange(Exchange exchange) {
+ // no copy so lets just create a new exchange always
return new DefaultExchange(exchange.getFromEndpoint(), ExchangePattern.InOnly);
}
@@ -312,10 +318,6 @@ public class WireTapProcessor extends AsyncProcessorSupport
return copy;
}
- public void setCopy(boolean copy) {
- this.copy = copy;
- }
-
public Processor getOnPrepare() {
return onPrepare;
}
@@ -350,6 +352,12 @@ public class WireTapProcessor extends AsyncProcessorSupport
@Override
protected void doBuild() throws Exception {
+ if (copy) {
+ // create a per processor exchange factory
+ this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class)
+ .getProcessorExchangeFactory().newProcessorExchangeFactory(this);
+ }
+
boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
if (pooled) {
taskFactory = new PooledTaskFactory(getId()) {
@@ -370,27 +378,27 @@ public class WireTapProcessor extends AsyncProcessorSupport
}
LOG.trace("Using TaskFactory: {}", taskFactory);
- ServiceHelper.buildService(taskFactory, processor);
+ ServiceHelper.buildService(processorExchangeFactory, taskFactory, processor);
}
@Override
protected void doInit() throws Exception {
- ServiceHelper.initService(taskFactory, processor);
+ ServiceHelper.initService(processorExchangeFactory, taskFactory, processor);
}
@Override
protected void doStart() throws Exception {
- ServiceHelper.startService(taskFactory, processor);
+ ServiceHelper.startService(processorExchangeFactory, taskFactory, processor);
}
@Override
protected void doStop() throws Exception {
- ServiceHelper.stopService(taskFactory, processor);
+ ServiceHelper.stopService(processorExchangeFactory, taskFactory, processor);
}
@Override
protected void doShutdown() throws Exception {
- ServiceHelper.stopAndShutdownServices(taskFactory, processor);
+ ServiceHelper.stopAndShutdownServices(processorExchangeFactory, taskFactory, processor);
if (shutdownExecutorService) {
getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
}
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
index 3d0c214..23a7508 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
@@ -84,9 +84,8 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
WireTapProcessor answer = new WireTapProcessor(
dynamicSendProcessor, target, uri,
- parse(ExchangePattern.class, definition.getPattern()),
+ parse(ExchangePattern.class, definition.getPattern()), isCopy,
threadPool, shutdownThreadPool, dynamic);
- answer.setCopy(isCopy);
Processor newExchangeProcessor = definition.getNewExchangeProcessor();
String ref = parseString(definition.getNewExchangeProcessorRef());
if (ref != null) {