You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/06/20 19:03:46 UTC

svn commit: r1137705 - in /camel/trunk/camel-core/src/main/java/org/apache/camel/processor: ErrorHandlerSupport.java RedeliveryErrorHandler.java

Author: davsclaus
Date: Mon Jun 20 17:03:45 2011
New Revision: 1137705

URL: http://svn.apache.org/viewvc?rev=1137705&view=rev
Log:
CAMEL-1817: Optimized to only do defensive copy of Exchange if redelivery has been enabled.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java?rev=1137705&r1=1137704&r2=1137705&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java Mon Jun 20 17:03:45 2011
@@ -41,8 +41,8 @@ public abstract class ErrorHandlerSuppor
 
     protected final transient Logger log = LoggerFactory.getLogger(getClass());
 
-    private final Map<ExceptionPolicyKey, OnExceptionDefinition> exceptionPolicies = new LinkedHashMap<ExceptionPolicyKey, OnExceptionDefinition>();
-    private ExceptionPolicyStrategy exceptionPolicy = createDefaultExceptionPolicyStrategy();
+    protected final Map<ExceptionPolicyKey, OnExceptionDefinition> exceptionPolicies = new LinkedHashMap<ExceptionPolicyKey, OnExceptionDefinition>();
+    protected ExceptionPolicyStrategy exceptionPolicy = createDefaultExceptionPolicyStrategy();
 
     public void addExceptionPolicy(OnExceptionDefinition exceptionType) {
         Processor processor = exceptionType.getErrorHandler();

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=1137705&r1=1137704&r2=1137705&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Mon Jun 20 17:03:45 2011
@@ -33,6 +33,7 @@ import org.apache.camel.impl.converter.A
 import org.apache.camel.model.OnExceptionDefinition;
 import org.apache.camel.spi.SubUnitOfWorkCallback;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.EventHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.MessageHelper;
@@ -46,7 +47,7 @@ import org.apache.camel.util.ServiceHelp
  * This implementation should contain all the error handling logic and the sub classes
  * should only configure it according to what they support.
  *
- * @version 
+ * @version
  */
 public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor {
 
@@ -62,6 +63,7 @@ public abstract class RedeliveryErrorHan
     protected final Predicate retryWhilePolicy;
     protected final CamelLogger logger;
     protected final boolean useOriginalMessagePolicy;
+    protected boolean redeliveryEnabled;
 
     /**
      * Contains the current redelivery data
@@ -88,7 +90,7 @@ public abstract class RedeliveryErrorHan
     /**
      * Tasks which performs asynchronous redelivery attempts, and being triggered by a
      * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task
-     * has to be delayed before a redelivery attempt is performed. 
+     * has to be delayed before a redelivery attempt is performed.
      */
     private class AsyncRedeliveryTask implements Callable<Boolean> {
 
@@ -171,8 +173,8 @@ public abstract class RedeliveryErrorHan
         }
     }
 
-    public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger, 
-            Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter, 
+    public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger,
+            Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter,
             String deadLetterUri, boolean useOriginalMessagePolicy, Predicate retryWhile, String executorServiceRef) {
 
         ObjectHelper.notNull(camelContext, "CamelContext", this);
@@ -207,11 +209,6 @@ public abstract class RedeliveryErrorHan
         return processErrorHandler(exchange, callback, new RedeliveryData());
     }
 
-    protected Exchange defensiveCopyExchange(Exchange exchange) {
-        // TODO: Optimize to only copy if redelivery is possible/enabled
-        return ExchangeHelper.createCopy(exchange, true);
-    }
-
     /**
      * Process the exchange using redelivery error handling.
      */
@@ -219,7 +216,7 @@ public abstract class RedeliveryErrorHan
 
         // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
         // original Exchange is being redelivered, and not a mutated Exchange
-        data.original = defensiveCopyExchange(exchange);
+        data.original = defensiveCopyExchangeIfNeeded(exchange);
 
         // use looping to have redelivery attempts
         while (true) {
@@ -429,6 +426,21 @@ public abstract class RedeliveryErrorHan
     }
 
     /**
+     * Performs a defensive copy of the exchange if needed
+     *
+     * @param exchange the exchange
+     * @return the defensive copy, or <tt>null</tt> if not needed (redelivery is not enabled).
+     */
+    protected Exchange defensiveCopyExchangeIfNeeded(Exchange exchange) {
+        // only do a defensive copy if redelivery is enabled
+        if (redeliveryEnabled) {
+            return ExchangeHelper.createCopy(exchange, true);
+        } else {
+            return null;
+        }
+    }
+
+    /**
      * Strategy whether the exchange has an exception that we should try to handle.
      * <p/>
      * Standard implementations should just look for an exception.
@@ -536,6 +548,9 @@ public abstract class RedeliveryErrorHan
     }
 
     protected void prepareExchangeForRedelivery(Exchange exchange, RedeliveryData data) {
+        // there must be a defensive copy of the exchange
+        ObjectHelper.notNull(data.original, "Defensive copy of Exchange is null", this);
+
         // okay we will give it another go so clear the exception so we can try again
         exchange.setException(null);
 
@@ -669,7 +684,7 @@ public abstract class RedeliveryErrorHan
                     exchange.setOut(null);
                 }
             }
-            
+
             // reset cached streams so they can be read again
             MessageHelper.resetStreamCache(exchange.getIn());
 
@@ -913,6 +928,55 @@ public abstract class RedeliveryErrorHan
         }
     }
 
+    /**
+     * Determines if redelivery is enabled by checking if any of the redelivery policy
+     * settings may allow redeliveries.
+     *
+     * @return <tt>true</tt> if redelivery is possible, <tt>false</tt> otherwise
+     * @throws Exception can be thrown
+     */
+    private boolean determineIfRedeliveryIsEnabled() throws Exception {
+        // determine if redeliver is enabled either on error handler
+        if (getRedeliveryPolicy().getMaximumRedeliveries() != 0) {
+            // must check for != 0 as (-1 means redeliver forever)
+            return true;
+        }
+        if (retryWhilePolicy != null) {
+            return true;
+        }
+
+        // or on the exception policies
+        if (!exceptionPolicies.isEmpty()) {
+            // walk them to see if any of them have a maximum redeliveries > 0 or retry until set
+            for (OnExceptionDefinition def : exceptionPolicies.values()) {
+
+                if (def.getRedeliveryPolicy() != null) {
+                    String ref = def.getRedeliveryPolicyRef();
+                    if (ref != null) {
+                        // lookup in registry if ref provided
+                        RedeliveryPolicy policy = CamelContextHelper.mandatoryLookup(camelContext, ref, RedeliveryPolicy.class);
+                        if (policy.getMaximumRedeliveries() != 0) {
+                            // must check for != 0 as (-1 means redeliver forever)
+                            return true;
+                        }
+                    } else {
+                        Integer max = CamelContextHelper.parseInteger(camelContext, def.getRedeliveryPolicy().getMaximumRedeliveries());
+                        if (max != null && max != 0) {
+                            // must check for != 0 as (-1 means redeliver forever)
+                            return true;
+                        }
+                    }
+                }
+
+                if (def.getRetryWhilePolicy() != null || def.getRetryWhile() != null) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
     @Override
     protected void doStart() throws Exception {
         ServiceHelper.startServices(output, outputAsync, deadLetter);
@@ -928,6 +992,12 @@ public abstract class RedeliveryErrorHan
                 executorService = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "ErrorHandlerRedeliveryTask");
             }
         }
+
+        // determine if redeliver is enabled or not
+        redeliveryEnabled = determineIfRedeliveryIsEnabled();
+        if (log.isDebugEnabled()) {
+            log.debug("Redelivery enabled: {} on error handler: {}", redeliveryEnabled, this);
+        }
     }
 
     @Override