You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2010/06/18 04:42:31 UTC

svn commit: r955817 - in /camel/branches/camel-1.x/camel-core/src: main/java/org/apache/camel/processor/DeadLetterChannel.java test/java/org/apache/camel/processor/DeadLetterChannelTest.java

Author: ningjiang
Date: Fri Jun 18 02:42:31 2010
New Revision: 955817

URL: http://svn.apache.org/viewvc?rev=955817&view=rev
Log:
CAMEL-2826 Fixed the issue of message content redelivered asynchronously by DLC is not re-readable by applying the patch with thanks to Ron

Modified:
    camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java

Modified: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=955817&r1=955816&r2=955817&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Fri Jun 18 02:42:31 2010
@@ -255,6 +255,10 @@ public class DeadLetterChannel extends E
             if (exchange.getException() != null) {
                 exchange.setException(null);
             }
+            
+            // reset cached streams so they can be read again
+            MessageHelper.resetStreamCache(exchange.getIn());
+            
             // wait until we should redeliver
             data.redeliveryDelay = data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
             timer.schedule(new RedeliverTimerTask(exchange, callback, data), data.redeliveryDelay);

Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java?rev=955817&r1=955816&r2=955817&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java (original)
+++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java Fri Jun 18 02:42:31 2010
@@ -16,6 +16,10 @@
  */
 package org.apache.camel.processor;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.ContextTestSupport;
@@ -24,6 +28,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.IOConverter;
 import org.apache.camel.model.LoggingLevel;
 
 /**
@@ -48,7 +53,7 @@ public class DeadLetterChannelTest exten
         assertMockEndpointsSatisfied();
     }
 
-    public void testLotsOfAttemptsFail() throws Exception {
+    public void testLotsOfAttemptsFailWithInputStreamPayload() throws Exception {
         failUntilAttempt = 5;
 
         deadEndpoint.expectedBodiesReceived(body);
@@ -56,7 +61,7 @@ public class DeadLetterChannelTest exten
         deadEndpoint.message(0).header(DeadLetterChannel.REDELIVERY_COUNTER).isEqualTo(2);
         successEndpoint.expectedMessageCount(0);
 
-        sendBody("direct:start", body);
+        sendBody("direct:start", new ByteArrayInputStream(body.getBytes()));
 
         assertMockEndpointsSatisfied();
 
@@ -82,6 +87,7 @@ public class DeadLetterChannelTest exten
             public void process(Exchange exchange) {
                 Integer counter = exchange.getIn().getHeader(DeadLetterChannel.REDELIVERY_COUNTER,
                                                              Integer.class);
+                checkBody(exchange.getIn().getBody());
                 int attempt = (counter == null) ? 1 : counter + 1;
                 if (attempt < failUntilAttempt) {
                     throw new RuntimeException("Failed to process due to attempt: " + attempt
@@ -92,6 +98,7 @@ public class DeadLetterChannelTest exten
             public boolean process(Exchange exchange, AsyncCallback callback) {                
                 Integer counter = exchange.getIn().getHeader(DeadLetterChannel.REDELIVERY_COUNTER,
                                                              Integer.class);
+                checkBody(exchange.getIn().getBody());
                 int attempt = (counter == null) ? 1 : counter + 1;
                 if (attempt > 1) {
                     assertEquals("Now we should use TimerThread to call the process", Thread.currentThread().getName(), "Camel DeadLetterChannel Redeliver Timer");
@@ -105,6 +112,17 @@ public class DeadLetterChannelTest exten
                 callback.done(false);
                 return false;
             }
+            
+            private void checkBody(Object body) {
+                try {
+                    assertEquals("Ensure message re-readability in the error handler",
+                                 DeadLetterChannelTest.this.body,
+                                 (body instanceof InputStream) ? new String(IOConverter.toBytes((InputStream)body)) : body);
+                } catch (IOException e) {
+                    fail(e.getMessage());
+                }
+            }
+            
         };
 
         return new RouteBuilder() {