You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/02/02 18:19:12 UTC

[camel] 04/05: CAMEL-14354: camel-core - Optimize pipeline to have work in run method insted of doProcess

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8560972de447ee91ac36f7005de8ac1976ebf4ff
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Feb 2 19:04:57 2020 +0100

    CAMEL-14354: camel-core - Optimize pipeline to have work in run method insted of doProcess
---
 .../java/org/apache/camel/processor/Pipeline.java  | 64 ++++++++++------------
 1 file changed, 29 insertions(+), 35 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index d39b02f..f36ae0d 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -71,7 +71,35 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
 
         @Override
         public void run() {
-            doProcess(this, exchange, callback, index);
+            boolean stop = exchange.isRouteStop();
+            int num = index.get();
+            boolean more = num < size;
+            boolean first = num == 0;
+
+            if (!stop && more && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG))) {
+
+                // prepare for next run
+                if (exchange.hasOut()) {
+                    exchange.setIn(exchange.getOut());
+                    exchange.setOut(null);
+                }
+
+                // get the next processor
+                AsyncProcessor processor = processors.get(index.getAndIncrement());
+
+                processor.process(exchange, doneSync -> reactiveExecutor.schedule(this));
+            } else {
+                ExchangeHelper.copyResults(exchange, exchange);
+
+                // logging nextExchange as it contains the exchange that might have altered the payload and since
+                // we are logging the completion if will be confusing if we log the original instead
+                // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+                }
+
+                reactiveExecutor.schedule(callback);
+            }
         }
     }
 
@@ -121,40 +149,6 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
         return false;
     }
 
-    protected void doProcess(PipelineTask task, Exchange exchange, AsyncCallback callback, AtomicInteger index) {
-        // optimize to use an atomic index counter for tracking how long we are in the processors list (uses less memory than iterator on array list)
-
-        boolean stop = exchange.isRouteStop();
-        int num = index.get();
-        boolean more = num < size;
-        boolean first = num == 0;
-
-        if (!stop && more && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG))) {
-
-            // prepare for next run
-            if (exchange.hasOut()) {
-                exchange.setIn(exchange.getOut());
-                exchange.setOut(null);
-            }
-
-            // get the next processor
-            AsyncProcessor processor = processors.get(index.getAndIncrement());
-
-            processor.process(exchange, doneSync -> reactiveExecutor.schedule(task));
-        } else {
-            ExchangeHelper.copyResults(exchange, exchange);
-
-            // logging nextExchange as it contains the exchange that might have altered the payload and since
-            // we are logging the completion if will be confusing if we log the original instead
-            // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
-            }
-
-            reactiveExecutor.schedule(callback);
-        }
-    }
-
     @Override
     protected void doStart() throws Exception {
         ServiceHelper.startService(processors);