You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/01/08 20:01:23 UTC

svn commit: r732793 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/builder/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/interceptor/

Author: davsclaus
Date: Thu Jan  8 11:01:22 2009
New Revision: 732793

URL: http://svn.apache.org/viewvc?rev=732793&view=rev
Log:
CAMEL-1234: Added onRedelivery option to DeadLetterChannel to allow processing a custom processor before every redelivery attempt.

Added:
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java   (contents, props changed)
      - copied, changed from r732654, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?rev=732793&r1=732792&r2=732793&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java Thu Jan  8 11:01:22 2009
@@ -38,6 +38,7 @@
  */
 public class DeadLetterChannelBuilder extends ErrorHandlerBuilderSupport {
     private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+    private Processor onRedelivery;
     private ExceptionPolicyStrategy exceptionPolicyStrategy = ErrorHandlerSupport.createDefaultExceptionPolicyStrategy();
     private ProcessorFactory deadLetterFactory;
     private Processor defaultDeadLetterEndpoint;
@@ -63,8 +64,8 @@
     }
 
     public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception {
-        Processor deadLetter = getDeadLetterFactory().createProcessor();       
-        DeadLetterChannel answer = new DeadLetterChannel(processor, deadLetter, getRedeliveryPolicy(), getLogger(), getExceptionPolicyStrategy());
+        Processor deadLetter = getDeadLetterFactory().createProcessor();
+        DeadLetterChannel answer = new DeadLetterChannel(processor, deadLetter, onRedelivery, getRedeliveryPolicy(), getLogger(), getExceptionPolicyStrategy());
         configure(answer);
         return answer;
     }
@@ -162,6 +163,16 @@
         return this;
     }
 
+    /**
+     * Sets a processor that should be processed <b>before</b> a redelivey attempt.
+     * <p/>
+     * Can be used to change the {@link org.apache.camel.Exchange} <b>before</b> its being redelivered.
+     */
+    public DeadLetterChannelBuilder onRedelivery(Processor processor) {
+        setOnRedelivery(processor);
+        return this;
+    }
+
     // Properties
     // -------------------------------------------------------------------------
     public RedeliveryPolicy getRedeliveryPolicy() {
@@ -260,6 +271,14 @@
         this.exceptionPolicyStrategy = exceptionPolicyStrategy;
     }
 
+    public Processor getOnRedelivery() {
+        return onRedelivery;
+    }
+
+    public void setOnRedelivery(Processor onRedelivery) {
+        this.onRedelivery = onRedelivery;
+    }
+
     @Override
     public String toString() {
         return "DeadLetterChannelBuilder(" + (deadLetterFactory != null ? deadLetterFactory : defaultDeadLetterEndpoint) + ")";

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=732793&r1=732792&r2=732793&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Thu Jan  8 11:01:22 2009
@@ -58,6 +58,7 @@
     private AsyncProcessor outputAsync;
     private RedeliveryPolicy redeliveryPolicy;
     private Logger logger;
+    private Processor redeliveryProcessor;
 
     private class RedeliveryData {
         int redeliveryCounter;
@@ -105,22 +106,17 @@
         } 
     }
 
-    public DeadLetterChannel(Processor output, Processor deadLetter) {
-        this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger(),
-            ErrorHandlerSupport.createDefaultExceptionPolicyStrategy());
-    }
-
-    public DeadLetterChannel(Processor output, Processor deadLetter, RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy exceptionPolicyStrategy) {
-        this.deadLetter = deadLetter;
+    public DeadLetterChannel(Processor output, Processor deadLetter, Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy exceptionPolicyStrategy) {
         this.output = output;
+        this.deadLetter = deadLetter;
+        this.redeliveryProcessor = redeliveryProcessor;
         this.outputAsync = AsyncProcessorTypeConverter.convert(output);
-
         this.redeliveryPolicy = redeliveryPolicy;
         this.logger = logger;
         setExceptionPolicy(exceptionPolicyStrategy);
     }
 
-    public static <E extends Exchange> Logger createDefaultLogger() {
+    public static Logger createDefaultLogger() {
         return new Logger(LOG, LoggingLevel.ERROR);
     }
 
@@ -170,7 +166,6 @@
             boolean shouldRedeliver = shouldRedeliver(exchange, data);
             if (!shouldRedeliver) {
                 return deliverToFaultProcessor(exchange, callback, data);
-                
             }
 
             // if we are redelivering then sleep before trying again
@@ -182,6 +177,9 @@
 
                 // wait until we should redeliver
                 data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
+
+                // letting onRedeliver be executed
+                deliverToRedeliveryProcessor(exchange, callback, data);
             }
 
             // process the exchange
@@ -257,7 +255,10 @@
             // wait until we should redeliver
             data.redeliveryDelay = data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
             timer.schedule(new RedeliverTimerTask(exchange, callback, data), data.redeliveryDelay);
-        }        
+
+            // letting onRedeliver be executed
+            deliverToRedeliveryProcessor(exchange, callback, data);
+        }
     }
     
     private void handleException(Exchange exchange, RedeliveryData data) {
@@ -274,13 +275,37 @@
             Processor processor = exceptionPolicy.getErrorHandler();
             if (processor != null) {
                 data.failureProcessor = processor;
-            }                    
+            }
         }
-        
-        logFailedDelivery(true, exchange, "Failed delivery for exchangeId: " + exchange.getExchangeId()
-                + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e, data, e);
+
+        String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
+                + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
+        logFailedDelivery(true, exchange, msg, data, e);
+
         data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
-        
+    }
+
+    /**
+     * Gives an optional configure redelivery processor a chance to process before the Exchange
+     * will be redelivered. This can be used to alter the Exchange.
+     */
+    private boolean deliverToRedeliveryProcessor(final Exchange exchange, final AsyncCallback callback,
+                                            final RedeliveryData data) {
+        if (redeliveryProcessor == null) {
+            return true;
+        }
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("RedeliveryProcessor " + redeliveryProcessor + " is processing Exchange before its redelivered");
+        }
+        AsyncProcessor afp = AsyncProcessorTypeConverter.convert(redeliveryProcessor);
+        boolean sync = afp.process(exchange, new AsyncCallback() {
+            public void done(boolean sync) {
+                callback.done(data.sync);
+            }
+        });
+
+        return sync;
     }
     
     private boolean deliverToFaultProcessor(final Exchange exchange, final AsyncCallback callback,
@@ -299,11 +324,10 @@
             }
         });
 
-        // The line below shouldn't be needed, it is invoked by the AsyncCallback above
-        // restoreExceptionOnExchange(exchange, data.handledPredicate);
-        logFailedDelivery(false, exchange, "Failed delivery for exchangeId: " + exchange.getExchangeId()
-                                           + ". Handled by the failure processor: " + data.failureProcessor,
-                          data, null);
+        String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
+                + ". Handled by the failure processor: " + data.failureProcessor;
+        logFailedDelivery(false, exchange, msg, data, null);
+
         return sync;
     }
 

Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java (from r732654, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java&r1=732654&r2=732793&rev=732793&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java Thu Jan  8 11:01:22 2009
@@ -14,23 +14,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.processor.interceptor;
+package org.apache.camel.processor;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.processor.onexception.MyTechnicalException;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.onexception.MyTechnicalException;
 
 /**
  * Unit test for testing possibility to modify exchange before redelivering
  */
-public class InterceptAlterMessageBeforeRedeliveryTest extends ContextTestSupport {
+public class DeadLetterChannelOnRedeliveryTest extends ContextTestSupport {
 
     static int counter;
 
-    public void testInterceptAlterMessageBeforeRedelivery() throws Exception {
+    public void testOnExceptionAlterMessageBeforeRedelivery() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("Hello World123");
 
@@ -39,7 +39,7 @@
         assertMockEndpointsSatisfied();
     }
 
-    public void testInterceptAlterMessageWithHeadersBeforeRedelivery() throws Exception {
+    public void testOnExceptionAlterMessageWithHeadersBeforeRedelivery() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("Hello World123");
         mock.expectedHeaderReceived("foo", "123");
@@ -60,25 +60,14 @@
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                // to execute unit test much faster we dont use delay between redeliveries
-                errorHandler(deadLetterChannel("mock:error").delay(0L));
-
                 // START SNIPPET: e1
-                // we configure an interceptor that is triggered when the redelivery flag
-                // has been set TRUE on an exchange
-                intercept().when(header("org.apache.camel.Redelivered").isEqualTo(Boolean.TRUE)).
-                        process(new Processor() {
-                            public void process(Exchange exchange) throws Exception {
-                                // the message is being redelivered so we can alter it
-
-                                // we just append the redelivery counter to the body
-                                // you can of course do all kind of stuff instead
-                                String body = exchange.getIn().getBody(String.class);
-                                int count = exchange.getIn().getHeader("org.apache.camel.RedeliveryCounter", Integer.class);
-
-                                exchange.getIn().setBody(body + count);
-                            }
-                        });
+                // we configure our Dead Letter Channel to invoke
+                // MyRedeliveryProcessor before a redelivery is
+                // attempted. This allows us to alter the message before
+                errorHandler(deadLetterChannel("mock:error")
+                        .onRedelivery(new MyRedeliverPrcessor())
+                        // setting delay to zero is just to make unit teting faster
+                        .delay(0L));
                 // END SNIPPET: e1
 
 
@@ -95,5 +84,23 @@
         };
     }
 
+    // START SNIPPET: e2
+    // This is our processor that is executed before every redelivery attempt
+    // here we can do what we want in the java code, such as altering the message
+    public class MyRedeliverPrcessor implements Processor {
+
+        public void process(Exchange exchange) throws Exception {
+            // the message is being redelivered so we can alter it
+
+            // we just append the redelivery counter to the body
+            // you can of course do all kind of stuff instead
+            String body = exchange.getIn().getBody(String.class);
+            int count = exchange.getIn().getHeader("org.apache.camel.RedeliveryCounter", Integer.class);
+
+            exchange.getIn().setBody(body + count);
+        }
+    }
+    // END SNIPPET: e2
+
 
 }
\ No newline at end of file

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java?rev=732793&r1=732792&r2=732793&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java Thu Jan  8 11:01:22 2009
@@ -19,9 +19,9 @@
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.processor.onexception.MyTechnicalException;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.onexception.MyTechnicalException;
 
 /**
  * Unit test for testing possibility to modify exchange before redelivering