You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/02/01 11:02:09 UTC

[camel] 03/03: CAMEL-14354: camel-core optimize. Reduce object allocations for lambdas in pipeline.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d18688cda33a3176751891c3cc6f0a0f304c57ef
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Feb 1 11:38:37 2020 +0100

    CAMEL-14354: camel-core optimize. Reduce object allocations for lambdas in pipeline.
---
 .../java/org/apache/camel/processor/Pipeline.java  | 31 ++++++++++++++++++----
 1 file changed, 26 insertions(+), 5 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index 7840875..ee01a49 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -40,6 +40,7 @@ import org.apache.camel.support.service.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 import static org.apache.camel.processor.PipelineHelper.continueProcessing;
 
 /**
@@ -57,6 +58,24 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
     private String id;
     private String routeId;
 
+    private final class PipelineTask implements Runnable {
+
+        private final Exchange exchange;
+        private final AsyncCallback callback;
+        private final AtomicInteger index;
+
+        PipelineTask(Exchange exchange, AsyncCallback callback, AtomicInteger index) {
+            this.exchange = exchange;
+            this.callback = callback;
+            this.index = index;
+        }
+
+        @Override
+        public void run() {
+            doProcess(this, exchange, callback, index);
+        }
+    }
+
     public Pipeline(CamelContext camelContext, Collection<Processor> processors) {
         this.camelContext = camelContext;
         this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
@@ -92,15 +111,18 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
+        // create task which has state used during routing
+        PipelineTask task = new PipelineTask(exchange, callback, new AtomicInteger());
+
         if (exchange.isTransacted()) {
-            reactiveExecutor.scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors, size, new AtomicInteger()));
+            reactiveExecutor.scheduleSync(task);
         } else {
-            reactiveExecutor.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors, size, new AtomicInteger()));
+            reactiveExecutor.scheduleMain(task);
         }
         return false;
     }
 
-    protected void doProcess(Exchange exchange, AsyncCallback callback, List<AsyncProcessor> processors, int size, AtomicInteger index) {
+    protected void doProcess(PipelineTask task, Exchange exchange, AsyncCallback callback, AtomicInteger index) {
         // optimize to use an atomic index counter for tracking how long we are in the processors list (uses less memory than iterator on array list)
 
         boolean stop = exchange.isRouteStop();
@@ -119,8 +141,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
             // get the next processor
             AsyncProcessor processor = processors.get(index.getAndIncrement());
 
-            processor.process(exchange, doneSync ->
-                    reactiveExecutor.schedule(() -> doProcess(exchange, callback, processors, size, index)));
+            processor.process(exchange, doneSync -> reactiveExecutor.schedule(task));
         } else {
             ExchangeHelper.copyResults(exchange, exchange);