You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/09 11:42:26 UTC
[camel] 01/02: CAMEL-16462: camel-core - Optimize Splitter EIP to
reduce object allocations.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6011aa0bb482efa4a4a12db94b05f6e4da881275
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Apr 9 13:14:31 2021 +0200
CAMEL-16462: camel-core - Optimize Splitter EIP to reduce object allocations.
---
.../apache/camel/spi/ProcessorExchangeFactory.java | 9 +++-
.../engine/PooledProcessorExchangeFactory.java | 50 ++++++++++++++++------
.../engine/PrototypeProcessorExchangeFactory.java | 5 +++
.../apache/camel/processor/MulticastProcessor.java | 9 +---
.../java/org/apache/camel/processor/Splitter.java | 19 +++++---
5 files changed, 65 insertions(+), 27 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
index c48ad15..214635f 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
@@ -57,7 +57,14 @@ public interface ProcessorExchangeFactory extends PooledObjectFactory<Exchange>,
/**
* Gets a copy of the given {@link Exchange}
*
- * @param exchange original copy of the exchange
+ * @param exchange original exchange
+ */
+ Exchange createCopy(Exchange exchange);
+
+ /**
+ * Gets a copy of the given {@link Exchange} the the copy is correlated to the source
+ *
+ * @param exchange original exchange
* @param handover whether the on completion callbacks should be handed over to the new copy.
*/
Exchange createCorrelatedCopy(Exchange exchange, boolean handover);
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
index 1826659..fe3c1d2 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
@@ -58,26 +58,36 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa
}
@Override
- public Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
- Exchange answer = pool.poll();
+ public Exchange createCopy(Exchange exchange) {
+ ExtendedExchange answer = (ExtendedExchange) pool.poll();
if (answer == null) {
+ if (statisticsEnabled) {
+ statistics.created.increment();
+ }
// create a new exchange as there was no free from the pool
- PooledExchange pe = new DefaultPooledExchange(exchange);
- ExchangeHelper.copyResults(pe, exchange);
- // do not reuse message id on copy
- pe.getIn().setMessageId(null);
- // do not share the unit of work
- pe.setUnitOfWork(null);
- if (handover) {
- // Need to hand over the completion for async invocation
- pe.adapt(ExtendedExchange.class).handoverCompletions(exchange);
+ answer = new DefaultPooledExchange(exchange);
+ } else {
+ if (statisticsEnabled) {
+ statistics.acquired.increment();
}
- // set a correlation id so we can track back the original exchange
- pe.setProperty(ExchangePropertyKey.CORRELATION_ID, exchange.getExchangeId());
+ // reset exchange for reuse
+ PooledExchange ee = (PooledExchange) answer;
+ ee.reset(System.currentTimeMillis());
+ }
+
+ ExchangeHelper.copyResults(answer, exchange);
+ return answer;
+ }
+
+ @Override
+ public Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
+ ExtendedExchange answer = (ExtendedExchange) pool.poll();
+ if (answer == null) {
if (statisticsEnabled) {
statistics.created.increment();
}
- answer = pe;
+ // create a new exchange as there was no free from the pool
+ answer = new DefaultPooledExchange(exchange);
} else {
if (statisticsEnabled) {
statistics.acquired.increment();
@@ -86,6 +96,18 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa
PooledExchange ee = (PooledExchange) answer;
ee.reset(System.currentTimeMillis());
}
+
+ ExchangeHelper.copyResults(answer, exchange);
+ // do not reuse message id on copy
+ answer.getIn().setMessageId(null);
+ // do not share the unit of work
+ answer.setUnitOfWork(null);
+ if (handover) {
+ // Need to hand over the completion for async invocation
+ answer.handoverCompletions(exchange);
+ }
+ // set a correlation id so we can track back the original exchange
+ answer.setProperty(ExchangePropertyKey.CORRELATION_ID, exchange.getExchangeId());
return answer;
}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
index 3a4a1f1..65de8f4 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
@@ -81,6 +81,11 @@ public class PrototypeProcessorExchangeFactory extends PooledObjectFactorySuppor
}
@Override
+ public Exchange createCopy(Exchange exchange) {
+ return exchange.copy();
+ }
+
+ @Override
public Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
return ExchangeHelper.createCorrelatedCopy(exchange, handover);
}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 39e59a7..199a392 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -215,13 +215,8 @@ public class MulticastProcessor extends AsyncProcessorSupport
this.shareUnitOfWork = shareUnitOfWork;
this.parallelAggregate = parallelAggregate;
this.stopOnAggregateException = stopOnAggregateException;
- if (this instanceof Splitter) {
- // not supported for splitter
- this.processorExchangeFactory = null;
- } else {
- this.processorExchangeFactory = camelContext.adapt(ExtendedCamelContext.class)
- .getProcessorExchangeFactory().newProcessorExchangeFactory(this);
- }
+ this.processorExchangeFactory = camelContext.adapt(ExtendedCamelContext.class)
+ .getProcessorExchangeFactory().newProcessorExchangeFactory(this);
}
@Override
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index 64fd105..a90091d 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -190,7 +190,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
// this avoids any side effect reflected upon the incoming exchange
final Object value;
final Iterator<?> iterator;
- private final Exchange copy;
+ private Exchange copy;
private final Route route;
private final Exchange original;
@@ -251,7 +251,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
if (part != null) {
// create a correlated copy as the new exchange to be routed in the splitter from the copy
// and do not share the unit of work
- Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
+ Exchange newExchange = processorExchangeFactory.createCorrelatedCopy(copy, false);
// If the splitter has an aggregation strategy
// then the StreamCache created by the child routes must not be
// closed by the unit of work of the child route, but by the unit of
@@ -284,7 +284,12 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
@Override
public void close() throws IOException {
- IOHelper.closeIterator(value);
+ if (copy != null) {
+ processorExchangeFactory.release(copy);
+ // null copy to avoid releasing it back again as close may be called multiple times
+ copy = null;
+ IOHelper.closeIterator(value);
+ }
}
}
@@ -337,8 +342,12 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
return expression;
}
- private static Exchange copyAndPrepareSubExchange(Exchange exchange, boolean preserveExchangeId) {
- Exchange answer = ExchangeHelper.createCopy(exchange, preserveExchangeId);
+ private Exchange copyAndPrepareSubExchange(Exchange exchange, boolean preserveExchangeId) {
+ Exchange answer = processorExchangeFactory.createCopy(exchange);
+ if (preserveExchangeId) {
+ // must preserve exchange id
+ answer.setExchangeId(exchange.getExchangeId());
+ }
if (exchange.getContext().isMessageHistory()) {
// we do not want to copy the message history for splitted sub-messages
answer.removeProperty(ExchangePropertyKey.MESSAGE_HISTORY);