You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/02/01 11:02:09 UTC
[camel] 03/03: CAMEL-14354: camel-core optimize. Reduce object
allocations for lambdas in pipeline.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit d18688cda33a3176751891c3cc6f0a0f304c57ef
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Feb 1 11:38:37 2020 +0100
CAMEL-14354: camel-core optimize. Reduce object allocations for lambdas in pipeline.
---
.../java/org/apache/camel/processor/Pipeline.java | 31 ++++++++++++++++++----
1 file changed, 26 insertions(+), 5 deletions(-)
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index 7840875..ee01a49 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -40,6 +40,7 @@ import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import static org.apache.camel.processor.PipelineHelper.continueProcessing;
/**
@@ -57,6 +58,24 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
private String id;
private String routeId;
+ private final class PipelineTask implements Runnable {
+
+ private final Exchange exchange;
+ private final AsyncCallback callback;
+ private final AtomicInteger index;
+
+ PipelineTask(Exchange exchange, AsyncCallback callback, AtomicInteger index) {
+ this.exchange = exchange;
+ this.callback = callback;
+ this.index = index;
+ }
+
+ @Override
+ public void run() {
+ doProcess(this, exchange, callback, index);
+ }
+ }
+
public Pipeline(CamelContext camelContext, Collection<Processor> processors) {
this.camelContext = camelContext;
this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
@@ -92,15 +111,18 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
+ // create task which has state used during routing
+ PipelineTask task = new PipelineTask(exchange, callback, new AtomicInteger());
+
if (exchange.isTransacted()) {
- reactiveExecutor.scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors, size, new AtomicInteger()));
+ reactiveExecutor.scheduleSync(task);
} else {
- reactiveExecutor.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors, size, new AtomicInteger()));
+ reactiveExecutor.scheduleMain(task);
}
return false;
}
- protected void doProcess(Exchange exchange, AsyncCallback callback, List<AsyncProcessor> processors, int size, AtomicInteger index) {
+ protected void doProcess(PipelineTask task, Exchange exchange, AsyncCallback callback, AtomicInteger index) {
// optimize to use an atomic index counter for tracking how long we are in the processors list (uses less memory than iterator on array list)
boolean stop = exchange.isRouteStop();
@@ -119,8 +141,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
// get the next processor
AsyncProcessor processor = processors.get(index.getAndIncrement());
- processor.process(exchange, doneSync ->
- reactiveExecutor.schedule(() -> doProcess(exchange, callback, processors, size, index)));
+ processor.process(exchange, doneSync -> reactiveExecutor.schedule(task));
} else {
ExchangeHelper.copyResults(exchange, exchange);