You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/02/02 18:19:12 UTC
[camel] 04/05: CAMEL-14354: camel-core - Optimize pipeline to have
work in run method insted of doProcess
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8560972de447ee91ac36f7005de8ac1976ebf4ff
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Feb 2 19:04:57 2020 +0100
CAMEL-14354: camel-core - Optimize pipeline to have work in run method insted of doProcess
---
.../java/org/apache/camel/processor/Pipeline.java | 64 ++++++++++------------
1 file changed, 29 insertions(+), 35 deletions(-)
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index d39b02f..f36ae0d 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -71,7 +71,35 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
@Override
public void run() {
- doProcess(this, exchange, callback, index);
+ boolean stop = exchange.isRouteStop();
+ int num = index.get();
+ boolean more = num < size;
+ boolean first = num == 0;
+
+ if (!stop && more && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG))) {
+
+ // prepare for next run
+ if (exchange.hasOut()) {
+ exchange.setIn(exchange.getOut());
+ exchange.setOut(null);
+ }
+
+ // get the next processor
+ AsyncProcessor processor = processors.get(index.getAndIncrement());
+
+ processor.process(exchange, doneSync -> reactiveExecutor.schedule(this));
+ } else {
+ ExchangeHelper.copyResults(exchange, exchange);
+
+ // logging nextExchange as it contains the exchange that might have altered the payload and since
+ // we are logging the completion if will be confusing if we log the original instead
+ // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+ }
+
+ reactiveExecutor.schedule(callback);
+ }
}
}
@@ -121,40 +149,6 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
return false;
}
- protected void doProcess(PipelineTask task, Exchange exchange, AsyncCallback callback, AtomicInteger index) {
- // optimize to use an atomic index counter for tracking how long we are in the processors list (uses less memory than iterator on array list)
-
- boolean stop = exchange.isRouteStop();
- int num = index.get();
- boolean more = num < size;
- boolean first = num == 0;
-
- if (!stop && more && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG))) {
-
- // prepare for next run
- if (exchange.hasOut()) {
- exchange.setIn(exchange.getOut());
- exchange.setOut(null);
- }
-
- // get the next processor
- AsyncProcessor processor = processors.get(index.getAndIncrement());
-
- processor.process(exchange, doneSync -> reactiveExecutor.schedule(task));
- } else {
- ExchangeHelper.copyResults(exchange, exchange);
-
- // logging nextExchange as it contains the exchange that might have altered the payload and since
- // we are logging the completion if will be confusing if we log the original instead
- // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
- if (LOG.isTraceEnabled()) {
- LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
- }
-
- reactiveExecutor.schedule(callback);
- }
- }
-
@Override
protected void doStart() throws Exception {
ServiceHelper.startService(processors);