You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/02/02 18:19:08 UTC

[camel] branch master updated (f335870 -> d7241e1)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from f335870  camel-schematron: Correct duplicated id in sch-messages-fr.xhtml.
     new 773c8f6  Polished
     new 11d4eab  CAMEL-14354: camel-core optimize. Lets use a inner class for the after task instead of lambda so we have nicer strackrrace and can better understand them.
     new 2a74c86  CAMEL-14354: camel-core - Optimize to reduce calling method in critical path
     new 8560972  CAMEL-14354: camel-core - Optimize pipeline to have work in run method insted of doProcess
     new d7241e1  CAMEL-14354: camel-core - Optimize to reduce calling method in critical path

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../camel/impl/engine/DefaultRouteContext.java     |   6 +-
 .../camel/processor/CamelInternalProcessor.java    | 124 ++++++++++++---------
 .../java/org/apache/camel/processor/Pipeline.java  |  65 +++++------
 3 files changed, 103 insertions(+), 92 deletions(-)


[camel] 03/05: CAMEL-14354: camel-core - Optimize to reduce calling method in critical path

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2a74c86ed69964ccf00a0e245d7aedd5fd560482
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Feb 2 19:01:23 2020 +0100

    CAMEL-14354: camel-core - Optimize to reduce calling method in critical path
---
 .../camel/processor/CamelInternalProcessor.java    | 39 ++++++++--------------
 1 file changed, 14 insertions(+), 25 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 6b8265e..d447036 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -210,12 +210,25 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
         // you can see in the code below.
         // ----------------------------------------------------------
 
-        if (processor == null || !continueProcessing(exchange)) {
+        if (processor == null || exchange.isRouteStop()) {
             // no processor or we should not continue then we are done
             originalCallback.done(true);
             return true;
         }
 
+        boolean forceShutdown = shutdownStrategy.forceShutdown(this);
+        if (forceShutdown) {
+            String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: " + exchange;
+            LOG.debug(msg);
+            if (exchange.getException() == null) {
+                exchange.setException(new RejectedExecutionException(msg));
+            }
+            // force shutdown so we should not continue
+            originalCallback.done(true);
+            return true;
+
+        }
+
         // optimise to use object array for states, and only for the number of advices that keep state
         final Object[] states = statefulAdvices > 0 ? new Object[statefulAdvices] : EMPTY_STATES;
         // optimise for loop using index access to avoid creating iterator object
@@ -299,30 +312,6 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
     }
 
     /**
-     * Strategy to determine if we should continue processing the {@link Exchange}.
-     */
-    private boolean continueProcessing(Exchange exchange) {
-        if (exchange.isRouteStop()) {
-            LOG.debug("Exchange is marked to stop routing: {}", exchange);
-            return false;
-        }
-
-        // determine if we can still run, or the camel context is forcing a shutdown
-        boolean forceShutdown = shutdownStrategy.forceShutdown(this);
-        if (forceShutdown) {
-            String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: " + exchange;
-            LOG.debug(msg);
-            if (exchange.getException() == null) {
-                exchange.setException(new RejectedExecutionException(msg));
-            }
-            return false;
-        }
-
-        // yes we can continue
-        return true;
-    }
-
-    /**
      * Advice to invoke callbacks for before and after routing.
      */
     public static class RouteLifecycleAdvice implements CamelInternalProcessorAdvice<Object> {


[camel] 04/05: CAMEL-14354: camel-core - Optimize pipeline to have work in run method insted of doProcess

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8560972de447ee91ac36f7005de8ac1976ebf4ff
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Feb 2 19:04:57 2020 +0100

    CAMEL-14354: camel-core - Optimize pipeline to have work in run method insted of doProcess
---
 .../java/org/apache/camel/processor/Pipeline.java  | 64 ++++++++++------------
 1 file changed, 29 insertions(+), 35 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index d39b02f..f36ae0d 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -71,7 +71,35 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
 
         @Override
         public void run() {
-            doProcess(this, exchange, callback, index);
+            boolean stop = exchange.isRouteStop();
+            int num = index.get();
+            boolean more = num < size;
+            boolean first = num == 0;
+
+            if (!stop && more && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG))) {
+
+                // prepare for next run
+                if (exchange.hasOut()) {
+                    exchange.setIn(exchange.getOut());
+                    exchange.setOut(null);
+                }
+
+                // get the next processor
+                AsyncProcessor processor = processors.get(index.getAndIncrement());
+
+                processor.process(exchange, doneSync -> reactiveExecutor.schedule(this));
+            } else {
+                ExchangeHelper.copyResults(exchange, exchange);
+
+                // logging nextExchange as it contains the exchange that might have altered the payload and since
+                // we are logging the completion if will be confusing if we log the original instead
+                // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+                }
+
+                reactiveExecutor.schedule(callback);
+            }
         }
     }
 
@@ -121,40 +149,6 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
         return false;
     }
 
-    protected void doProcess(PipelineTask task, Exchange exchange, AsyncCallback callback, AtomicInteger index) {
-        // optimize to use an atomic index counter for tracking how long we are in the processors list (uses less memory than iterator on array list)
-
-        boolean stop = exchange.isRouteStop();
-        int num = index.get();
-        boolean more = num < size;
-        boolean first = num == 0;
-
-        if (!stop && more && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG))) {
-
-            // prepare for next run
-            if (exchange.hasOut()) {
-                exchange.setIn(exchange.getOut());
-                exchange.setOut(null);
-            }
-
-            // get the next processor
-            AsyncProcessor processor = processors.get(index.getAndIncrement());
-
-            processor.process(exchange, doneSync -> reactiveExecutor.schedule(task));
-        } else {
-            ExchangeHelper.copyResults(exchange, exchange);
-
-            // logging nextExchange as it contains the exchange that might have altered the payload and since
-            // we are logging the completion if will be confusing if we log the original instead
-            // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
-            }
-
-            reactiveExecutor.schedule(callback);
-        }
-    }
-
     @Override
     protected void doStart() throws Exception {
         ServiceHelper.startService(processors);


[camel] 02/05: CAMEL-14354: camel-core optimize. Lets use a inner class for the after task instead of lambda so we have nicer strackrrace and can better understand them.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 11d4eab0f695d4011cda56503f6feea5290bb443
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Feb 2 14:02:59 2020 +0100

    CAMEL-14354: camel-core optimize. Lets use a inner class for the after task instead of lambda so we have nicer strackrrace and can better understand them.
---
 .../camel/processor/CamelInternalProcessor.java    | 76 ++++++++++++++--------
 1 file changed, 48 insertions(+), 28 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index f2a7c7e..6b8265e 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -146,6 +146,53 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
         return null;
     }
 
+    /**
+     * Callback task to process the advices after processing.
+     */
+    private class AsyncAfterTask implements AsyncCallback {
+
+        private final Object[] states;
+        private final Exchange exchange;
+        private final AsyncCallback originalCallback;
+
+        private AsyncAfterTask(Object[] states, Exchange exchange, AsyncCallback originalCallback) {
+            this.states = states;
+            this.exchange = exchange;
+            this.originalCallback = originalCallback;
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public void done(boolean doneSync) {
+            try {
+                for (int i = advices.size() - 1, j = states.length - 1; i >= 0; i--) {
+                    CamelInternalProcessorAdvice task = advices.get(i);
+                    Object state = null;
+                    if (task.hasState()) {
+                        state = states[j--];
+                    }
+                    try {
+                        task.after(exchange, state);
+                    } catch (Throwable e) {
+                        exchange.setException(e);
+                        // allow all advices to complete even if there was an exception
+                    }
+                }
+            } finally {
+                // ----------------------------------------------------------
+                // CAMEL END USER - DEBUG ME HERE +++ START +++
+                // ----------------------------------------------------------
+                // callback must be called
+                if (originalCallback != null) {
+                    reactiveExecutor.schedule(originalCallback);
+                }
+                // ----------------------------------------------------------
+                // CAMEL END USER - DEBUG ME HERE +++ END +++
+                // ----------------------------------------------------------
+            }
+        }
+    }
+
     @Override
     @SuppressWarnings("unchecked")
     public boolean process(Exchange exchange, AsyncCallback originalCallback) {
@@ -187,34 +234,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
         }
 
         // create internal callback which will execute the advices in reverse order when done
-        AsyncCallback callback = doneSync -> {
-            try {
-                for (int i = advices.size() - 1, j = states.length - 1; i >= 0; i--) {
-                    CamelInternalProcessorAdvice task = advices.get(i);
-                    Object state = null;
-                    if (task.hasState()) {
-                        state = states[j--];
-                    }
-                    try {
-                        task.after(exchange, state);
-                    } catch (Throwable e) {
-                        exchange.setException(e);
-                        // allow all advices to complete even if there was an exception
-                    }
-                }
-            } finally {
-                // ----------------------------------------------------------
-                // CAMEL END USER - DEBUG ME HERE +++ START +++
-                // ----------------------------------------------------------
-                // callback must be called
-                if (originalCallback != null) {
-                    reactiveExecutor.schedule(originalCallback);
-                }
-                // ----------------------------------------------------------
-                // CAMEL END USER - DEBUG ME HERE +++ END +++
-                // ----------------------------------------------------------
-            }
-        };
+        AsyncCallback callback = new AsyncAfterTask(states, exchange, originalCallback);
 
         if (exchange.isTransacted()) {
             // must be synchronized for transacted exchanges


[camel] 01/05: Polished

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 773c8f6789e3ccd857dce0dabe616815c42c9487
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Feb 2 10:30:31 2020 +0100

    Polished
---
 .../main/java/org/apache/camel/impl/engine/DefaultRouteContext.java | 6 +++---
 .../src/main/java/org/apache/camel/processor/Pipeline.java          | 1 -
 2 files changed, 3 insertions(+), 4 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java
index 9cf4ad0..afc8e95 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java
@@ -208,10 +208,10 @@ public class DefaultRouteContext implements RouteContext {
             // wrap in route lifecycle
             internal.addAdvice(new CamelInternalProcessor.RouteLifecycleAdvice());
 
-            // wrap in REST binding
+            // add advices
             advices.forEach(internal::addAdvice);
 
-            // and create the route that wraps the UoW
+            // and create the route that wraps all of this
             Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), internal);
             edcr.getProperties().putAll(properties);
 
@@ -572,7 +572,7 @@ public class DefaultRouteContext implements RouteContext {
         Set<NamedNode> list = getErrorHandlers(source);
         Set<NamedNode> previous = errorHandlers.put(target, list);
         if (list != previous && ObjectHelper.isNotEmpty(previous) && ObjectHelper.isNotEmpty(list)) {
-            throw new IllegalStateException("multiple references with different handlers");
+            throw new IllegalStateException("Multiple references with different handlers");
         }
     }
 }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index ee01a49..d39b02f 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -40,7 +40,6 @@ import org.apache.camel.support.service.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 import static org.apache.camel.processor.PipelineHelper.continueProcessing;
 
 /**


[camel] 05/05: CAMEL-14354: camel-core - Optimize to reduce calling method in critical path

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d7241e1bb045d333dd265d06ba2458994a77adad
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Feb 2 19:18:21 2020 +0100

    CAMEL-14354: camel-core - Optimize to reduce calling method in critical path
---
 .../apache/camel/processor/CamelInternalProcessor.java  | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index d447036..84c4ac0 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -149,21 +149,30 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
     /**
      * Callback task to process the advices after processing.
      */
-    private class AsyncAfterTask implements AsyncCallback {
+    private static final class AsyncAfterTask implements AsyncCallback {
 
+        private final List<CamelInternalProcessorAdvice<?>> advices;
         private final Object[] states;
         private final Exchange exchange;
         private final AsyncCallback originalCallback;
+        private final ReactiveExecutor reactiveExecutor;
 
-        private AsyncAfterTask(Object[] states, Exchange exchange, AsyncCallback originalCallback) {
+        private AsyncAfterTask(List<CamelInternalProcessorAdvice<?>> advices, Object[] states,
+                               Exchange exchange, AsyncCallback originalCallback, ReactiveExecutor reactiveExecutor) {
+            this.advices = advices;
             this.states = states;
             this.exchange = exchange;
             this.originalCallback = originalCallback;
+            this.reactiveExecutor = reactiveExecutor;
         }
 
         @Override
-        @SuppressWarnings("unchecked")
         public void done(boolean doneSync) {
+            // noop as we override run method
+        }
+
+        @Override
+        public void run() {
             try {
                 for (int i = advices.size() - 1, j = states.length - 1; i >= 0; i--) {
                     CamelInternalProcessorAdvice task = advices.get(i);
@@ -247,7 +256,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
         }
 
         // create internal callback which will execute the advices in reverse order when done
-        AsyncCallback callback = new AsyncAfterTask(states, exchange, originalCallback);
+        AsyncCallback callback = new AsyncAfterTask(advices, states, exchange, originalCallback, reactiveExecutor);
 
         if (exchange.isTransacted()) {
             // must be synchronized for transacted exchanges