You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/01/27 12:34:55 UTC
[camel] 01/04: CAMEL-14354: camel-core optimize
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit a0a5b39c9196ff7ec583b0a759d5119078f96d15
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jan 27 12:42:02 2020 +0100
CAMEL-14354: camel-core optimize
---
.../src/main/java/org/apache/camel/Exchange.java | 1 +
.../java/org/apache/camel/ExtendedExchange.java | 10 +++++
.../engine/DefaultAsyncProcessorAwaitManager.java | 2 +-
.../errorhandler/RedeliveryErrorHandler.java | 46 ++++++++++------------
.../org/apache/camel/support/DefaultExchange.java | 13 +++++-
.../org/apache/camel/support/ExchangeHelper.java | 11 ------
6 files changed, 44 insertions(+), 39 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
index 0c4da07..364722f 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
@@ -166,6 +166,7 @@ public interface Exchange {
String INTERCEPTED_ENDPOINT = "CamelInterceptedEndpoint";
String INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED = "CamelInterceptSendToEndpointWhenMatched";
+ @Deprecated
String INTERRUPTED = "CamelInterrupted";
String LANGUAGE_SCRIPT = "CamelLanguageScript";
diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
index 840cc93..e3681e9 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
@@ -108,4 +108,14 @@ public interface ExtendedExchange extends Exchange {
*/
void setNotifyEvent(boolean notifyEvent);
+ /**
+ * Whether the exchange was interrupted (InterruptException) during routing.
+ */
+ boolean isInterrupted();
+
+ /**
+ * Used to signal that this exchange was interrupted (InterruptException) during routing.
+ */
+ void setInterrupted(boolean interrupted);
+
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
index fb67842..fade8a4 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
@@ -191,7 +191,7 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
interruptedCounter.incrementAndGet();
}
exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId()));
- exchange.setProperty(Exchange.INTERRUPTED, Boolean.TRUE);
+ exchange.adapt(ExtendedExchange.class).setInterrupted(true);
entry.getLatch().countDown();
}
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 30d2eab..ca010ff 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -30,6 +30,7 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.ExtendedExchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Message;
import org.apache.camel.Navigate;
@@ -276,35 +277,26 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
* Strategy to determine if the exchange is done so we can continue
*/
protected boolean isDone(Exchange exchange) {
- boolean answer = isCancelledOrInterrupted(exchange);
+ if (((ExtendedExchange) exchange).isInterrupted()) {
+ // mark the exchange to stop continue routing when interrupted
+ // as we do not want to continue routing (for example a task has been cancelled)
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Is exchangeId: {} interrupted? true", exchange.getExchangeId());
+ }
+ exchange.setRouteStop(true);
+ return true;
+ }
// only done if the exchange hasn't failed
// and it has not been handled by the failure processor
// or we are exhausted
- if (!answer) {
- answer = exchange.getException() == null
- || ExchangeHelper.isFailureHandled(exchange)
- || ExchangeHelper.isRedeliveryExhausted(exchange);
- }
-
- LOG.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer);
- return answer;
- }
+ boolean answer = exchange.getException() == null
+ || ExchangeHelper.isFailureHandled(exchange)
+ || ExchangeHelper.isRedeliveryExhausted(exchange);
- /**
- * Strategy to determine if the exchange was cancelled or interrupted
- */
- protected boolean isCancelledOrInterrupted(Exchange exchange) {
- boolean answer = false;
-
- if (ExchangeHelper.isInterrupted(exchange)) {
- // mark the exchange to stop continue routing when interrupted
- // as we do not want to continue routing (for example a task has been cancelled)
- exchange.setRouteStop(true);
- answer = true;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer);
}
-
- LOG.trace("Is exchangeId: {} interrupted? {}", exchange.getExchangeId(), answer);
return answer;
}
@@ -386,14 +378,14 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
// do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
// original Exchange is being redelivered, and not a mutated Exchange
- this.original = defensiveCopyExchangeIfNeeded(exchange);
+ this.original = redeliveryEnabled ? defensiveCopyExchangeIfNeeded(exchange) : null;
this.exchange = exchange;
this.callback = callback;
}
@Override
public String toString() {
- return "Step[" + exchange.getExchangeId() + "," + RedeliveryErrorHandler.this + "]";
+ return "RedeliveryState";
}
/**
@@ -564,7 +556,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
// process the exchange (also redelivery)
outputAsync.process(exchange, doneSync -> {
- LOG.trace("Redelivering exchangeId: {}", exchange.getExchangeId());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Redelivering exchangeId: {}", exchange.getExchangeId());
+ }
// only process if the exchange hasn't failed
// and it has not been handled by the error processor
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index e78e59b..3961ec7 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -62,6 +62,7 @@ public final class DefaultExchange implements ExtendedExchange {
private boolean rollbackOnly;
private boolean rollbackOnlyLast;
private boolean notifyEvent;
+ private boolean interrupted;
public DefaultExchange(CamelContext context) {
this(context, ExchangePattern.InOnly);
@@ -411,7 +412,7 @@ public final class DefaultExchange implements ExtendedExchange {
}
if (t instanceof InterruptedException) {
// mark the exchange as interrupted due to the interrupt exception
- setProperty(Exchange.INTERRUPTED, Boolean.TRUE);
+ setInterrupted(true);
}
}
@@ -628,6 +629,16 @@ public final class DefaultExchange implements ExtendedExchange {
this.notifyEvent = notifyEvent;
}
+ @Override
+ public boolean isInterrupted() {
+ return interrupted;
+ }
+
+ @Override
+ public void setInterrupted(boolean interrupted) {
+ this.interrupted = interrupted;
+ }
+
/**
* Configures the message after it has been set on the exchange
*/
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 5f6b938..46a9105 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -603,17 +603,6 @@ public final class ExchangeHelper {
}
/**
- * Checks whether the exchange {@link UnitOfWork} has been interrupted during processing
- *
- * @param exchange the exchange
- * @return <tt>true</tt> if interrupted, <tt>false</tt> otherwise
- */
- public static boolean isInterrupted(Exchange exchange) {
- Object value = exchange.getProperty(Exchange.INTERRUPTED);
- return value != null && Boolean.TRUE == value;
- }
-
- /**
* Check whether or not stream caching is enabled for the given route or globally.
*
* @param exchange the exchange