You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/01/27 12:34:54 UTC

[camel] branch master updated (8226bcd -> 187ab82)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 8226bcd  CAMEL-14354: camel-core optimize
     new a0a5b39  CAMEL-14354: camel-core optimize
     new ac875ba  CAMEL-14354: camel-core optimize
     new 7acff9d  CAMEL-14354: camel-core optimize
     new 187ab82  CAMEL-14354: camel-core optimize

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/main/java/org/apache/camel/Exchange.java   |  1 +
 .../java/org/apache/camel/ExtendedExchange.java    | 10 +++++
 .../org/apache/camel/spi/UnitOfWorkFactory.java    |  9 ++++-
 .../engine/DefaultAsyncProcessorAwaitManager.java  | 24 +++++++----
 .../impl/engine/DefaultUnitOfWorkFactory.java      | 14 ++++++-
 .../camel/processor/CamelInternalProcessor.java    |  3 ++
 .../camel/processor/DelayProcessorSupport.java     | 22 ++++++++---
 .../java/org/apache/camel/processor/Enricher.java  |  8 +++-
 .../apache/camel/processor/FinallyProcessor.java   |  4 +-
 .../org/apache/camel/processor/LoopProcessor.java  | 10 +++--
 .../apache/camel/processor/MulticastProcessor.java |  2 +-
 .../org/apache/camel/processor/RoutingSlip.java    | 24 ++++++++---
 .../java/org/apache/camel/processor/Throttler.java | 36 ++++++++++++-----
 .../org/apache/camel/processor/TryProcessor.java   | 10 +++--
 .../processor/aggregate/AggregateProcessor.java    |  4 +-
 .../errorhandler/RedeliveryErrorHandler.java       | 46 ++++++++++------------
 .../loadbalancer/FailOverLoadBalancer.java         |  8 +++-
 .../org/apache/camel/support/DefaultExchange.java  | 13 +++++-
 .../org/apache/camel/support/ExchangeHelper.java   | 15 ++-----
 .../org/apache/camel/support/UnitOfWorkHelper.java |  4 --
 20 files changed, 179 insertions(+), 88 deletions(-)


[camel] 02/04: CAMEL-14354: camel-core optimize

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ac875ba4e3e81c3f849b77617b98805b336f0a99
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jan 27 12:59:32 2020 +0100

    CAMEL-14354: camel-core optimize
---
 .../engine/DefaultAsyncProcessorAwaitManager.java  | 22 ++++++++-----
 .../camel/processor/DelayProcessorSupport.java     | 22 +++++++++----
 .../java/org/apache/camel/processor/Enricher.java  |  8 +++--
 .../apache/camel/processor/FinallyProcessor.java   |  4 ++-
 .../org/apache/camel/processor/LoopProcessor.java  | 10 ++++--
 .../apache/camel/processor/MulticastProcessor.java |  2 +-
 .../org/apache/camel/processor/RoutingSlip.java    | 24 +++++++++++----
 .../java/org/apache/camel/processor/Throttler.java | 36 ++++++++++++++++------
 .../org/apache/camel/processor/TryProcessor.java   | 10 ++++--
 .../processor/aggregate/AggregateProcessor.java    |  4 ++-
 .../loadbalancer/FailOverLoadBalancer.java         |  8 +++--
 11 files changed, 109 insertions(+), 41 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
index fade8a4..3db01a4 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
@@ -96,20 +96,26 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
             }
         } while (reactiveExecutor.executeFromQueue());
 
-        LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
-                exchange.getExchangeId(), exchange);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
+                    exchange.getExchangeId(), exchange);
+        }
         try {
             if (statistics.isStatisticsEnabled()) {
                 blockedCounter.incrementAndGet();
             }
             inflight.put(exchange, new AwaitThreadEntry(Thread.currentThread(), exchange, latch));
             latch.await();
-            LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}",
-                    exchange.getExchangeId(), exchange);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}",
+                        exchange.getExchangeId(), exchange);
+            }
 
         } catch (InterruptedException e) {
-            LOG.trace("Interrupted while waiting for callback, will continue routing exchangeId: {} -> {}",
-                    exchange.getExchangeId(), exchange);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Interrupted while waiting for callback, will continue routing exchangeId: {} -> {}",
+                        exchange.getExchangeId(), exchange);
+            }
             exchange.setException(e);
         } finally {
             AwaitThread thread = inflight.remove(exchange);
@@ -134,7 +140,9 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
     }
 
     public void countDown(Exchange exchange, CountDownLatch latch) {
-        LOG.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId());
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId());
+        }
         latch.countDown();
     }
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java b/core/camel-base/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
index 42c6176..e65164a 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
@@ -65,7 +65,9 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor imple
             // we are running now so decrement the counter
             delayedCount.decrementAndGet();
 
-            LOG.trace("Delayed task woke up and continues routing for exchangeId: {}", exchange.getExchangeId());
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Delayed task woke up and continues routing for exchangeId: {}", exchange.getExchangeId());
+            }
             if (!isRunAllowed()) {
                 exchange.setException(new RejectedExecutionException("Run is not allowed"));
             }
@@ -74,7 +76,9 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor imple
             DelayProcessorSupport.this.processor.process(exchange, new AsyncCallback() {
                 @Override
                 public void done(boolean doneSync) {
-                    LOG.trace("Delayed task done for exchangeId: {}", exchange.getExchangeId());
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Delayed task done for exchangeId: {}", exchange.getExchangeId());
+                    }
                     // we must done the callback from this async callback as well, to ensure callback is done correctly
                     // must invoke done on callback with false, as that is what the original caller would
                     // expect as we returned false in the process method
@@ -114,8 +118,10 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor imple
             delayedCount.incrementAndGet();
             ProcessCall call = new ProcessCall(exchange, callback);
             try {
-                LOG.trace("Scheduling delayed task to run in {} millis for exchangeId: {}",
-                        delay, exchange.getExchangeId());
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Scheduling delayed task to run in {} millis for exchangeId: {}",
+                            delay, exchange.getExchangeId());
+                }
                 executorService.schedule(call, delay, TimeUnit.MILLISECONDS);
                 // tell Camel routing engine we continue routing asynchronous
                 return false;
@@ -126,7 +132,9 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor imple
                     if (!isRunAllowed()) {
                         exchange.setException(new RejectedExecutionException());
                     } else {
-                        LOG.debug("Scheduling rejected task, so letting caller run, delaying at first for {} millis for exchangeId: {}", delay, exchange.getExchangeId());
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Scheduling rejected task, so letting caller run, delaying at first for {} millis for exchangeId: {}", delay, exchange.getExchangeId());
+                        }
                         // let caller run by processing
                         try {
                             delay(delay, exchange);
@@ -160,7 +168,9 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor imple
             delay = calculateDelay(exchange);
             if (delay <= 0) {
                 // no delay then continue routing
-                LOG.trace("No delay for exchangeId: {}", exchange.getExchangeId());
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("No delay for exchangeId: {}", exchange.getExchangeId());
+                }
                 return processor.process(exchange, callback);
             }
         } catch (Throwable e) {
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
index a63a747..bbbca1b 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
@@ -254,13 +254,17 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
         });
 
         if (!sync) {
-            LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
+            }
             // the remainder of the routing slip will be completed async
             // so we break out now, then the callback will be invoked which then continue routing from where we left here
             return false;
         }
 
-        LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
+        }
 
         if (watch != null) {
             // emit event that the exchange was sent to the endpoint
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/FinallyProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/FinallyProcessor.java
index 7a65995..447dff3 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/FinallyProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/FinallyProcessor.java
@@ -115,7 +115,9 @@ public class FinallyProcessor extends DelegateAsyncProcessor implements Traceabl
                 if (!doneSync) {
                     // signal callback to continue routing async
                     ExchangeHelper.prepareOutToIn(exchange);
-                    LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+                    }
                 }
             } finally {
                 // callback must always be called
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
index a70d550..278a507 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -129,11 +129,15 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
                 } else {
                     // we are done so prepare the result
                     ExchangeHelper.copyResults(exchange, current);
-                    LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+                    }
                     callback.done(false);
                 }
             } catch (Exception e) {
-                LOG.trace("Processing failed for exchangeId: {} >>> {}", exchange.getExchangeId(), e.getMessage());
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing failed for exchangeId: {} >>> {}", exchange.getExchangeId(), e.getMessage());
+                }
                 exchange.setException(e);
                 callback.done(false);
             }
@@ -141,7 +145,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
 
         @Override
         public String toString() {
-            return "LoopState[" + exchange.getExchangeId() + "]";
+            return "LoopState";
         }
     }
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 82bafea..e9bb034 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -302,7 +302,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
 
         @Override
         public String toString() {
-            return "MulticastTask[" + original.getExchangeId() + "," + MulticastProcessor.this + "]";
+            return "MulticastTask";
         }
 
         @Override
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java
index eec3a47..7069354 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -258,13 +258,17 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
             current = prepareExchangeForRoutingSlip(current, endpoint);
             
             if (!sync) {
-                LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
+                }
                 // the remainder of the routing slip will be completed async
                 // so we break out now, then the callback will be invoked which then continue routing from where we left here
                 return false;
             }
 
-            LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
+            }
 
             // we ignore some kind of exceptions and allow us to continue
             if (isIgnoreInvalidEndpoints()) {
@@ -287,7 +291,9 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
         // logging nextExchange as it contains the exchange that might have altered the payload and since
         // we are logging the completion if will be confusing if we log the original instead
         // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
-        LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), current);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), current);
+        }
 
         // copy results back to the original exchange
         ExchangeHelper.copyResults(exchange, current);
@@ -350,7 +356,9 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
                                       final AsyncCallback originalCallback, final RoutingSlipIterator iter) {
 
         // this does the actual processing so log at trace level
-        LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+        }
 
         // routing slip callback which are used when
         // - routing slip was routed asynchronously
@@ -424,7 +432,9 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
                             current = prepareExchangeForRoutingSlip(current, endpoint1);
 
                             if (!sync) {
-                                LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId());
+                                if (LOG.isTraceEnabled()) {
+                                    LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId());
+                                }
                                 return;
                             }
                         }
@@ -432,7 +442,9 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
                         // logging nextExchange as it contains the exchange that might have altered the payload and since
                         // we are logging the completion if will be confusing if we log the original instead
                         // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
-                        LOG.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), current);
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), current);
+                        }
 
                         // copy results back to the original exchange
                         ExchangeHelper.copyResults(original, current);
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Throttler.java b/core/camel-base/src/main/java/org/apache/camel/processor/Throttler.java
index ea5a536..b518a4c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Throttler.java
@@ -135,7 +135,9 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa
                 } else {
                     // delegate to async pool
                     if (isAsyncDelayed() && !exchange.isTransacted() && state == State.SYNC) {
-                        LOG.debug("Throttle rate exceeded but AsyncDelayed enabled, so queueing for async processing, exchangeId: {}", exchange.getExchangeId());
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Throttle rate exceeded but AsyncDelayed enabled, so queueing for async processing, exchangeId: {}", exchange.getExchangeId());
+                        }
                         return processAsynchronously(exchange, callback, throttlingState);
                     }
 
@@ -154,10 +156,14 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa
                     if (state == State.ASYNC) {
                         if (LOG.isTraceEnabled()) {
                             long queuedTime = start - queuedStart;
-                            LOG.trace("Queued for {}ms, Throttled for {}ms, exchangeId: {}", queuedTime, elapsed, exchange.getExchangeId());
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("Queued for {}ms, Throttled for {}ms, exchangeId: {}", queuedTime, elapsed, exchange.getExchangeId());
+                            }
                         }
                     } else {
-                        LOG.trace("Throttled for {}ms, exchangeId: {}", elapsed, exchange.getExchangeId());
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Throttled for {}ms, exchangeId: {}", elapsed, exchange.getExchangeId());
+                        }
                     }
                 }
             } else {
@@ -169,7 +175,9 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa
                         LOG.trace("Queued for {}ms, No throttling applied (throttle cleared while queued), for exchangeId: {}", queuedTime, exchange.getExchangeId());
                     }
                 } else {
-                    LOG.trace("No throttling applied to exchangeId: {}", exchange.getExchangeId());
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("No throttling applied to exchangeId: {}", exchange.getExchangeId());
+                    }
                 }
             }
 
@@ -211,7 +219,9 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa
             return false;
         } catch (final RejectedExecutionException e) {
             if (isCallerRunsWhenRejected()) {
-                LOG.debug("AsyncExecutor is full, rejected exchange will run in the current thread, exchangeId: {}", exchange.getExchangeId());
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("AsyncExecutor is full, rejected exchange will run in the current thread, exchangeId: {}", exchange.getExchangeId());
+                }
                 exchange.setProperty(PROPERTY_EXCHANGE_STATE, State.ASYNC_REJECTED);
                 return process(exchange, callback);
             }
@@ -314,9 +324,13 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa
                         while (delta > 0) {
                             delayQueue.take();
                             delta--;
-                            LOG.trace("Permit discarded due to throttling rate decrease, triggered by ExchangeId: {}", exchange.getExchangeId());
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("Permit discarded due to throttling rate decrease, triggered by ExchangeId: {}", exchange.getExchangeId());
+                            }
+                        }
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Throttle rate decreased from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId());
                         }
-                        LOG.debug("Throttle rate decreased from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId());
 
                         // increase
                     } else if (newThrottle > throttleRate) {
@@ -325,9 +339,13 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa
                             delayQueue.put(new ThrottlePermit(-1));
                         }
                         if (throttleRate == 0) {
-                            LOG.debug("Initial throttle rate set to {}, triggered by ExchangeId: {}", newThrottle, exchange.getExchangeId());
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Initial throttle rate set to {}, triggered by ExchangeId: {}", newThrottle, exchange.getExchangeId());
+                            }
                         } else {
-                            LOG.debug("Throttle rate increase from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId());
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Throttle rate increase from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId());
+                            }
                         }
                     }
                     throttleRate = newThrottle;
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
index c13baaa..a556195 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
@@ -101,20 +101,24 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc
                 // process the next processor
                 Processor processor = processors.next();
                 AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
-                LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+                }
                 async.process(exchange, doneSync -> reactiveExecutor.schedule(this));
             } else {
                 ExchangeHelper.prepareOutToIn(exchange);
                 exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
                 exchange.setProperty(Exchange.EXCEPTION_HANDLED, lastHandled);
-                LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+                }
                 callback.done(false);
             }
         }
 
         @Override
         public String toString() {
-            return "TryState[" + exchange.getExchangeId() + "]";
+            return "TryState";
         }
     }
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index a8bde2d..503e2c7 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -893,7 +893,9 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
             // grab the timeout value
             long timeout = exchange.hasProperties() ? exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, 0, long.class) : 0;
             if (timeout > 0) {
-                LOG.trace("Restoring CompletionTimeout for exchangeId: {} with timeout: {} millis.", exchange.getExchangeId(), timeout);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Restoring CompletionTimeout for exchangeId: {} with timeout: {} millis.", exchange.getExchangeId(), timeout);
+                }
                 addExchangeToTimeoutMap(key, exchange, timeout);
             }
         }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
index 286ea75..6a8f900 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
@@ -147,7 +147,9 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab
             }
         }
 
-        LOG.trace("Should failover: {} for exchangeId: {}", answer, exchange.getExchangeId());
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Should failover: {} for exchangeId: {}", answer, exchange.getExchangeId());
+        }
 
         return answer;
     }
@@ -201,7 +203,9 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab
                 lastGoodIndex.set(index);
                 // and copy the current result to original so it will contain this result of this eip
                 ExchangeHelper.copyResults(exchange, copy);
-                LOG.debug("Failover complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Failover complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+                }
                 callback.done(false);
                 return;
             }


[camel] 01/04: CAMEL-14354: camel-core optimize

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a0a5b39c9196ff7ec583b0a759d5119078f96d15
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jan 27 12:42:02 2020 +0100

    CAMEL-14354: camel-core optimize
---
 .../src/main/java/org/apache/camel/Exchange.java   |  1 +
 .../java/org/apache/camel/ExtendedExchange.java    | 10 +++++
 .../engine/DefaultAsyncProcessorAwaitManager.java  |  2 +-
 .../errorhandler/RedeliveryErrorHandler.java       | 46 ++++++++++------------
 .../org/apache/camel/support/DefaultExchange.java  | 13 +++++-
 .../org/apache/camel/support/ExchangeHelper.java   | 11 ------
 6 files changed, 44 insertions(+), 39 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
index 0c4da07..364722f 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
@@ -166,6 +166,7 @@ public interface Exchange {
 
     String INTERCEPTED_ENDPOINT = "CamelInterceptedEndpoint";
     String INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED = "CamelInterceptSendToEndpointWhenMatched";
+    @Deprecated
     String INTERRUPTED = "CamelInterrupted";
 
     String LANGUAGE_SCRIPT          = "CamelLanguageScript";
diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
index 840cc93..e3681e9 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
@@ -108,4 +108,14 @@ public interface ExtendedExchange extends Exchange {
      */
     void setNotifyEvent(boolean notifyEvent);
 
+    /**
+     * Whether the exchange was interrupted (InterruptException) during routing.
+     */
+    boolean isInterrupted();
+
+    /**
+     * Used to signal that this exchange was interrupted (InterruptException) during routing.
+     */
+    void setInterrupted(boolean interrupted);
+
 }
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
index fb67842..fade8a4 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
@@ -191,7 +191,7 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
                     interruptedCounter.incrementAndGet();
                 }
                 exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId()));
-                exchange.setProperty(Exchange.INTERRUPTED, Boolean.TRUE);
+                exchange.adapt(ExtendedExchange.class).setInterrupted(true);
                 entry.getLatch().countDown();
             }
         }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 30d2eab..ca010ff 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -30,6 +30,7 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Message;
 import org.apache.camel.Navigate;
@@ -276,35 +277,26 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
      * Strategy to determine if the exchange is done so we can continue
      */
     protected boolean isDone(Exchange exchange) {
-        boolean answer = isCancelledOrInterrupted(exchange);
+        if (((ExtendedExchange) exchange).isInterrupted()) {
+            // mark the exchange to stop continue routing when interrupted
+            // as we do not want to continue routing (for example a task has been cancelled)
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Is exchangeId: {} interrupted? true", exchange.getExchangeId());
+            }
+            exchange.setRouteStop(true);
+            return true;
+        }
 
         // only done if the exchange hasn't failed
         // and it has not been handled by the failure processor
         // or we are exhausted
-        if (!answer) {
-            answer = exchange.getException() == null
-                || ExchangeHelper.isFailureHandled(exchange)
-                || ExchangeHelper.isRedeliveryExhausted(exchange);
-        }
-
-        LOG.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer);
-        return answer;
-    }
+        boolean answer = exchange.getException() == null
+            || ExchangeHelper.isFailureHandled(exchange)
+            || ExchangeHelper.isRedeliveryExhausted(exchange);
 
-    /**
-     * Strategy to determine if the exchange was cancelled or interrupted
-     */
-    protected boolean isCancelledOrInterrupted(Exchange exchange) {
-        boolean answer = false;
-
-        if (ExchangeHelper.isInterrupted(exchange)) {
-            // mark the exchange to stop continue routing when interrupted
-            // as we do not want to continue routing (for example a task has been cancelled)
-            exchange.setRouteStop(true);
-            answer = true;
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer);
         }
-
-        LOG.trace("Is exchangeId: {} interrupted? {}", exchange.getExchangeId(), answer);
         return answer;
     }
 
@@ -386,14 +378,14 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
 
             // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
             // original Exchange is being redelivered, and not a mutated Exchange
-            this.original = defensiveCopyExchangeIfNeeded(exchange);
+            this.original = redeliveryEnabled ? defensiveCopyExchangeIfNeeded(exchange) : null;
             this.exchange = exchange;
             this.callback = callback;
         }
 
         @Override
         public String toString() {
-            return "Step[" + exchange.getExchangeId() + "," + RedeliveryErrorHandler.this + "]";
+            return "RedeliveryState";
         }
 
         /**
@@ -564,7 +556,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
 
             // process the exchange (also redelivery)
             outputAsync.process(exchange, doneSync -> {
-                LOG.trace("Redelivering exchangeId: {}", exchange.getExchangeId());
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Redelivering exchangeId: {}", exchange.getExchangeId());
+                }
 
                 // only process if the exchange hasn't failed
                 // and it has not been handled by the error processor
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index e78e59b..3961ec7 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -62,6 +62,7 @@ public final class DefaultExchange implements ExtendedExchange {
     private boolean rollbackOnly;
     private boolean rollbackOnlyLast;
     private boolean notifyEvent;
+    private boolean interrupted;
 
     public DefaultExchange(CamelContext context) {
         this(context, ExchangePattern.InOnly);
@@ -411,7 +412,7 @@ public final class DefaultExchange implements ExtendedExchange {
         }
         if (t instanceof InterruptedException) {
             // mark the exchange as interrupted due to the interrupt exception
-            setProperty(Exchange.INTERRUPTED, Boolean.TRUE);
+            setInterrupted(true);
         }
     }
 
@@ -628,6 +629,16 @@ public final class DefaultExchange implements ExtendedExchange {
         this.notifyEvent = notifyEvent;
     }
 
+    @Override
+    public boolean isInterrupted() {
+        return interrupted;
+    }
+
+    @Override
+    public void setInterrupted(boolean interrupted) {
+        this.interrupted = interrupted;
+    }
+
     /**
      * Configures the message after it has been set on the exchange
      */
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 5f6b938..46a9105 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -603,17 +603,6 @@ public final class ExchangeHelper {
     }
 
     /**
-     * Checks whether the exchange {@link UnitOfWork} has been interrupted during processing
-     *
-     * @param exchange  the exchange
-     * @return <tt>true</tt> if interrupted, <tt>false</tt> otherwise
-     */
-    public static boolean isInterrupted(Exchange exchange) {
-        Object value = exchange.getProperty(Exchange.INTERRUPTED);
-        return value != null && Boolean.TRUE == value;
-    }
-
-    /**
      * Check whether or not stream caching is enabled for the given route or globally.
      *
      * @param exchange  the exchange


[camel] 03/04: CAMEL-14354: camel-core optimize

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 7acff9d6201da53d6746a4dc88d8169c7b489b02
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jan 27 13:27:05 2020 +0100

    CAMEL-14354: camel-core optimize
---
 .../main/java/org/apache/camel/spi/UnitOfWorkFactory.java  |  9 ++++++++-
 .../apache/camel/impl/engine/DefaultUnitOfWorkFactory.java | 14 ++++++++++++--
 .../org/apache/camel/processor/CamelInternalProcessor.java |  3 +++
 3 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWorkFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWorkFactory.java
index 9af44dc..64b2429 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWorkFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWorkFactory.java
@@ -16,12 +16,14 @@
  */
 package org.apache.camel.spi;
 
+import org.apache.camel.AfterPropertiesConfigured;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 
 /**
  * Factory to create {@link org.apache.camel.spi.UnitOfWork}.
  */
-public interface UnitOfWorkFactory {
+public interface UnitOfWorkFactory extends AfterPropertiesConfigured {
 
     /**
      * Creates a new {@link UnitOfWork}
@@ -30,5 +32,10 @@ public interface UnitOfWorkFactory {
      * @return the created {@link UnitOfWork}
      */
     UnitOfWork createUnitOfWork(Exchange exchange);
+
+    @Override
+    default void afterPropertiesConfigured(CamelContext camelContext) {
+        // noop
+    }
 }
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWorkFactory.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWorkFactory.java
index a2af28e..69e4f93 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWorkFactory.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWorkFactory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.impl.engine;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.spi.UnitOfWorkFactory;
@@ -25,15 +26,24 @@ import org.apache.camel.spi.UnitOfWorkFactory;
  */
 public class DefaultUnitOfWorkFactory implements UnitOfWorkFactory {
 
+    private boolean usedMDCLogging;
+    private String mdcLoggingKeysPattern;
+
     @Override
     public UnitOfWork createUnitOfWork(Exchange exchange) {
         UnitOfWork answer;
-        if (exchange.getContext().isUseMDCLogging()) {
-            answer = new MDCUnitOfWork(exchange, exchange.getContext().getMDCLoggingKeysPattern());
+        if (usedMDCLogging) {
+            answer = new MDCUnitOfWork(exchange, mdcLoggingKeysPattern);
         } else {
             answer = new DefaultUnitOfWork(exchange);
         }
         return answer;
     }
 
+    @Override
+    public void afterPropertiesConfigured(CamelContext camelContext) {
+        usedMDCLogging = camelContext.isUseMDCLogging() != null && camelContext.isUseMDCLogging();
+        mdcLoggingKeysPattern = camelContext.getMDCLoggingKeysPattern();
+    }
+
 }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 8c7dcb1..c1e4d3b 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -60,6 +60,7 @@ import org.apache.camel.support.OrderedComparator;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.support.UnitOfWorkHelper;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
+import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -595,6 +596,8 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
             if (routeContext != null) {
                 this.routeId = routeContext.getRouteId();
                 this.uowFactory = routeContext.getCamelContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory();
+                // optimize uow factory to initialize it early and once per advice
+                this.uowFactory.afterPropertiesConfigured(routeContext.getCamelContext());
             }
         }
 


[camel] 04/04: CAMEL-14354: camel-core optimize

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 187ab82ea76ef51f4de1dd9d298a56a36ef71795
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jan 27 13:32:45 2020 +0100

    CAMEL-14354: camel-core optimize
---
 .../src/main/java/org/apache/camel/support/ExchangeHelper.java        | 4 +++-
 .../src/main/java/org/apache/camel/support/UnitOfWorkHelper.java      | 4 ----
 2 files changed, 3 insertions(+), 5 deletions(-)

diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 46a9105..b2bd26a 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -255,7 +255,9 @@ public final class ExchangeHelper {
             copy.getIn().setMessageId(null);
         }
         // do not share the unit of work
-        copy.adapt(ExtendedExchange.class).setUnitOfWork(null);
+        ExtendedExchange ee = (ExtendedExchange) exchange;
+        ee.setUnitOfWork(null);
+
         // do not reuse the message id
         // hand over on completion to the copy if we got any
         UnitOfWork uow = exchange.getUnitOfWork();
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
index d6405fd..7c03036 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
@@ -63,10 +63,6 @@ public final class UnitOfWorkHelper {
             LOG.warn("Exception occurred during stopping UnitOfWork for Exchange: " + exchange
                     + ". This exception will be ignored.", e);
         }
-
-        // remove uow from exchange as its done
-        ExtendedExchange ee = (ExtendedExchange) exchange;
-        ee.setUnitOfWork(null);
     }
 
     public static void doneSynchronizations(Exchange exchange, List<Synchronization> synchronizations, Logger log) {