You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/06/20 19:03:46 UTC
svn commit: r1137705 - in
/camel/trunk/camel-core/src/main/java/org/apache/camel/processor:
ErrorHandlerSupport.java RedeliveryErrorHandler.java
Author: davsclaus
Date: Mon Jun 20 17:03:45 2011
New Revision: 1137705
URL: http://svn.apache.org/viewvc?rev=1137705&view=rev
Log:
CAMEL-1817: Optimized to only do defensive copy of Exchange if redelivery has been enabled.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java?rev=1137705&r1=1137704&r2=1137705&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java Mon Jun 20 17:03:45 2011
@@ -41,8 +41,8 @@ public abstract class ErrorHandlerSuppor
protected final transient Logger log = LoggerFactory.getLogger(getClass());
- private final Map<ExceptionPolicyKey, OnExceptionDefinition> exceptionPolicies = new LinkedHashMap<ExceptionPolicyKey, OnExceptionDefinition>();
- private ExceptionPolicyStrategy exceptionPolicy = createDefaultExceptionPolicyStrategy();
+ protected final Map<ExceptionPolicyKey, OnExceptionDefinition> exceptionPolicies = new LinkedHashMap<ExceptionPolicyKey, OnExceptionDefinition>();
+ protected ExceptionPolicyStrategy exceptionPolicy = createDefaultExceptionPolicyStrategy();
public void addExceptionPolicy(OnExceptionDefinition exceptionType) {
Processor processor = exceptionType.getErrorHandler();
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=1137705&r1=1137704&r2=1137705&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Mon Jun 20 17:03:45 2011
@@ -33,6 +33,7 @@ import org.apache.camel.impl.converter.A
import org.apache.camel.model.OnExceptionDefinition;
import org.apache.camel.spi.SubUnitOfWorkCallback;
import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.EventHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.MessageHelper;
@@ -46,7 +47,7 @@ import org.apache.camel.util.ServiceHelp
* This implementation should contain all the error handling logic and the sub classes
* should only configure it according to what they support.
*
- * @version
+ * @version
*/
public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor {
@@ -62,6 +63,7 @@ public abstract class RedeliveryErrorHan
protected final Predicate retryWhilePolicy;
protected final CamelLogger logger;
protected final boolean useOriginalMessagePolicy;
+ protected boolean redeliveryEnabled;
/**
* Contains the current redelivery data
@@ -88,7 +90,7 @@ public abstract class RedeliveryErrorHan
/**
* Tasks which performs asynchronous redelivery attempts, and being triggered by a
* {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task
- * has to be delayed before a redelivery attempt is performed.
+ * has to be delayed before a redelivery attempt is performed.
*/
private class AsyncRedeliveryTask implements Callable<Boolean> {
@@ -171,8 +173,8 @@ public abstract class RedeliveryErrorHan
}
}
- public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger,
- Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter,
+ public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger,
+ Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter,
String deadLetterUri, boolean useOriginalMessagePolicy, Predicate retryWhile, String executorServiceRef) {
ObjectHelper.notNull(camelContext, "CamelContext", this);
@@ -207,11 +209,6 @@ public abstract class RedeliveryErrorHan
return processErrorHandler(exchange, callback, new RedeliveryData());
}
- protected Exchange defensiveCopyExchange(Exchange exchange) {
- // TODO: Optimize to only copy if redelivery is possible/enabled
- return ExchangeHelper.createCopy(exchange, true);
- }
-
/**
* Process the exchange using redelivery error handling.
*/
@@ -219,7 +216,7 @@ public abstract class RedeliveryErrorHan
// do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
// original Exchange is being redelivered, and not a mutated Exchange
- data.original = defensiveCopyExchange(exchange);
+ data.original = defensiveCopyExchangeIfNeeded(exchange);
// use looping to have redelivery attempts
while (true) {
@@ -429,6 +426,21 @@ public abstract class RedeliveryErrorHan
}
/**
+ * Performs a defensive copy of the exchange if needed
+ *
+ * @param exchange the exchange
+ * @return the defensive copy, or <tt>null</tt> if not needed (redelivery is not enabled).
+ */
+ protected Exchange defensiveCopyExchangeIfNeeded(Exchange exchange) {
+ // only do a defensive copy if redelivery is enabled
+ if (redeliveryEnabled) {
+ return ExchangeHelper.createCopy(exchange, true);
+ } else {
+ return null;
+ }
+ }
+
+ /**
* Strategy whether the exchange has an exception that we should try to handle.
* <p/>
* Standard implementations should just look for an exception.
@@ -536,6 +548,9 @@ public abstract class RedeliveryErrorHan
}
protected void prepareExchangeForRedelivery(Exchange exchange, RedeliveryData data) {
+ // there must be a defensive copy of the exchange
+ ObjectHelper.notNull(data.original, "Defensive copy of Exchange is null", this);
+
// okay we will give it another go so clear the exception so we can try again
exchange.setException(null);
@@ -669,7 +684,7 @@ public abstract class RedeliveryErrorHan
exchange.setOut(null);
}
}
-
+
// reset cached streams so they can be read again
MessageHelper.resetStreamCache(exchange.getIn());
@@ -913,6 +928,55 @@ public abstract class RedeliveryErrorHan
}
}
+ /**
+ * Determines if redelivery is enabled by checking if any of the redelivery policy
+ * settings may allow redeliveries.
+ *
+ * @return <tt>true</tt> if redelivery is possible, <tt>false</tt> otherwise
+ * @throws Exception can be thrown
+ */
+ private boolean determineIfRedeliveryIsEnabled() throws Exception {
+ // determine if redeliver is enabled either on error handler
+ if (getRedeliveryPolicy().getMaximumRedeliveries() != 0) {
+ // must check for != 0 as (-1 means redeliver forever)
+ return true;
+ }
+ if (retryWhilePolicy != null) {
+ return true;
+ }
+
+ // or on the exception policies
+ if (!exceptionPolicies.isEmpty()) {
+ // walk them to see if any of them have a maximum redeliveries > 0 or retry until set
+ for (OnExceptionDefinition def : exceptionPolicies.values()) {
+
+ if (def.getRedeliveryPolicy() != null) {
+ String ref = def.getRedeliveryPolicyRef();
+ if (ref != null) {
+ // lookup in registry if ref provided
+ RedeliveryPolicy policy = CamelContextHelper.mandatoryLookup(camelContext, ref, RedeliveryPolicy.class);
+ if (policy.getMaximumRedeliveries() != 0) {
+ // must check for != 0 as (-1 means redeliver forever)
+ return true;
+ }
+ } else {
+ Integer max = CamelContextHelper.parseInteger(camelContext, def.getRedeliveryPolicy().getMaximumRedeliveries());
+ if (max != null && max != 0) {
+ // must check for != 0 as (-1 means redeliver forever)
+ return true;
+ }
+ }
+ }
+
+ if (def.getRetryWhilePolicy() != null || def.getRetryWhile() != null) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
@Override
protected void doStart() throws Exception {
ServiceHelper.startServices(output, outputAsync, deadLetter);
@@ -928,6 +992,12 @@ public abstract class RedeliveryErrorHan
executorService = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "ErrorHandlerRedeliveryTask");
}
}
+
+ // determine if redeliver is enabled or not
+ redeliveryEnabled = determineIfRedeliveryIsEnabled();
+ if (log.isDebugEnabled()) {
+ log.debug("Redelivery enabled: {} on error handler: {}", redeliveryEnabled, this);
+ }
}
@Override