You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/01/27 12:34:55 UTC

[camel] 01/04: CAMEL-14354: camel-core optimize

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a0a5b39c9196ff7ec583b0a759d5119078f96d15
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jan 27 12:42:02 2020 +0100

    CAMEL-14354: camel-core optimize
---
 .../src/main/java/org/apache/camel/Exchange.java   |  1 +
 .../java/org/apache/camel/ExtendedExchange.java    | 10 +++++
 .../engine/DefaultAsyncProcessorAwaitManager.java  |  2 +-
 .../errorhandler/RedeliveryErrorHandler.java       | 46 ++++++++++------------
 .../org/apache/camel/support/DefaultExchange.java  | 13 +++++-
 .../org/apache/camel/support/ExchangeHelper.java   | 11 ------
 6 files changed, 44 insertions(+), 39 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
index 0c4da07..364722f 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
@@ -166,6 +166,7 @@ public interface Exchange {
 
     String INTERCEPTED_ENDPOINT = "CamelInterceptedEndpoint";
     String INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED = "CamelInterceptSendToEndpointWhenMatched";
+    @Deprecated
     String INTERRUPTED = "CamelInterrupted";
 
     String LANGUAGE_SCRIPT          = "CamelLanguageScript";
diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
index 840cc93..e3681e9 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
@@ -108,4 +108,14 @@ public interface ExtendedExchange extends Exchange {
      */
     void setNotifyEvent(boolean notifyEvent);
 
+    /**
+     * Whether the exchange was interrupted (InterruptException) during routing.
+     */
+    boolean isInterrupted();
+
+    /**
+     * Used to signal that this exchange was interrupted (InterruptException) during routing.
+     */
+    void setInterrupted(boolean interrupted);
+
 }
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
index fb67842..fade8a4 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
@@ -191,7 +191,7 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
                     interruptedCounter.incrementAndGet();
                 }
                 exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId()));
-                exchange.setProperty(Exchange.INTERRUPTED, Boolean.TRUE);
+                exchange.adapt(ExtendedExchange.class).setInterrupted(true);
                 entry.getLatch().countDown();
             }
         }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 30d2eab..ca010ff 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -30,6 +30,7 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Message;
 import org.apache.camel.Navigate;
@@ -276,35 +277,26 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
      * Strategy to determine if the exchange is done so we can continue
      */
     protected boolean isDone(Exchange exchange) {
-        boolean answer = isCancelledOrInterrupted(exchange);
+        if (((ExtendedExchange) exchange).isInterrupted()) {
+            // mark the exchange to stop continue routing when interrupted
+            // as we do not want to continue routing (for example a task has been cancelled)
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Is exchangeId: {} interrupted? true", exchange.getExchangeId());
+            }
+            exchange.setRouteStop(true);
+            return true;
+        }
 
         // only done if the exchange hasn't failed
         // and it has not been handled by the failure processor
         // or we are exhausted
-        if (!answer) {
-            answer = exchange.getException() == null
-                || ExchangeHelper.isFailureHandled(exchange)
-                || ExchangeHelper.isRedeliveryExhausted(exchange);
-        }
-
-        LOG.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer);
-        return answer;
-    }
+        boolean answer = exchange.getException() == null
+            || ExchangeHelper.isFailureHandled(exchange)
+            || ExchangeHelper.isRedeliveryExhausted(exchange);
 
-    /**
-     * Strategy to determine if the exchange was cancelled or interrupted
-     */
-    protected boolean isCancelledOrInterrupted(Exchange exchange) {
-        boolean answer = false;
-
-        if (ExchangeHelper.isInterrupted(exchange)) {
-            // mark the exchange to stop continue routing when interrupted
-            // as we do not want to continue routing (for example a task has been cancelled)
-            exchange.setRouteStop(true);
-            answer = true;
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer);
         }
-
-        LOG.trace("Is exchangeId: {} interrupted? {}", exchange.getExchangeId(), answer);
         return answer;
     }
 
@@ -386,14 +378,14 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
 
             // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
             // original Exchange is being redelivered, and not a mutated Exchange
-            this.original = defensiveCopyExchangeIfNeeded(exchange);
+            this.original = redeliveryEnabled ? defensiveCopyExchangeIfNeeded(exchange) : null;
             this.exchange = exchange;
             this.callback = callback;
         }
 
         @Override
         public String toString() {
-            return "Step[" + exchange.getExchangeId() + "," + RedeliveryErrorHandler.this + "]";
+            return "RedeliveryState";
         }
 
         /**
@@ -564,7 +556,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
 
             // process the exchange (also redelivery)
             outputAsync.process(exchange, doneSync -> {
-                LOG.trace("Redelivering exchangeId: {}", exchange.getExchangeId());
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Redelivering exchangeId: {}", exchange.getExchangeId());
+                }
 
                 // only process if the exchange hasn't failed
                 // and it has not been handled by the error processor
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index e78e59b..3961ec7 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -62,6 +62,7 @@ public final class DefaultExchange implements ExtendedExchange {
     private boolean rollbackOnly;
     private boolean rollbackOnlyLast;
     private boolean notifyEvent;
+    private boolean interrupted;
 
     public DefaultExchange(CamelContext context) {
         this(context, ExchangePattern.InOnly);
@@ -411,7 +412,7 @@ public final class DefaultExchange implements ExtendedExchange {
         }
         if (t instanceof InterruptedException) {
             // mark the exchange as interrupted due to the interrupt exception
-            setProperty(Exchange.INTERRUPTED, Boolean.TRUE);
+            setInterrupted(true);
         }
     }
 
@@ -628,6 +629,16 @@ public final class DefaultExchange implements ExtendedExchange {
         this.notifyEvent = notifyEvent;
     }
 
+    @Override
+    public boolean isInterrupted() {
+        return interrupted;
+    }
+
+    @Override
+    public void setInterrupted(boolean interrupted) {
+        this.interrupted = interrupted;
+    }
+
     /**
      * Configures the message after it has been set on the exchange
      */
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 5f6b938..46a9105 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -603,17 +603,6 @@ public final class ExchangeHelper {
     }
 
     /**
-     * Checks whether the exchange {@link UnitOfWork} has been interrupted during processing
-     *
-     * @param exchange  the exchange
-     * @return <tt>true</tt> if interrupted, <tt>false</tt> otherwise
-     */
-    public static boolean isInterrupted(Exchange exchange) {
-        Object value = exchange.getProperty(Exchange.INTERRUPTED);
-        return value != null && Boolean.TRUE == value;
-    }
-
-    /**
      * Check whether or not stream caching is enabled for the given route or globally.
      *
      * @param exchange  the exchange