You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2010/06/18 04:42:31 UTC
svn commit: r955817 - in /camel/branches/camel-1.x/camel-core/src:
main/java/org/apache/camel/processor/DeadLetterChannel.java
test/java/org/apache/camel/processor/DeadLetterChannelTest.java
Author: ningjiang
Date: Fri Jun 18 02:42:31 2010
New Revision: 955817
URL: http://svn.apache.org/viewvc?rev=955817&view=rev
Log:
CAMEL-2826 Fixed the issue of message content redelivered asynchronously by DLC is not re-readable by applying the patch with thanks to Ron
Modified:
camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
Modified: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=955817&r1=955816&r2=955817&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Fri Jun 18 02:42:31 2010
@@ -255,6 +255,10 @@ public class DeadLetterChannel extends E
if (exchange.getException() != null) {
exchange.setException(null);
}
+
+ // reset cached streams so they can be read again
+ MessageHelper.resetStreamCache(exchange.getIn());
+
// wait until we should redeliver
data.redeliveryDelay = data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
timer.schedule(new RedeliverTimerTask(exchange, callback, data), data.redeliveryDelay);
Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java?rev=955817&r1=955816&r2=955817&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java (original)
+++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java Fri Jun 18 02:42:31 2010
@@ -16,6 +16,10 @@
*/
package org.apache.camel.processor;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.ContextTestSupport;
@@ -24,6 +28,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.IOConverter;
import org.apache.camel.model.LoggingLevel;
/**
@@ -48,7 +53,7 @@ public class DeadLetterChannelTest exten
assertMockEndpointsSatisfied();
}
- public void testLotsOfAttemptsFail() throws Exception {
+ public void testLotsOfAttemptsFailWithInputStreamPayload() throws Exception {
failUntilAttempt = 5;
deadEndpoint.expectedBodiesReceived(body);
@@ -56,7 +61,7 @@ public class DeadLetterChannelTest exten
deadEndpoint.message(0).header(DeadLetterChannel.REDELIVERY_COUNTER).isEqualTo(2);
successEndpoint.expectedMessageCount(0);
- sendBody("direct:start", body);
+ sendBody("direct:start", new ByteArrayInputStream(body.getBytes()));
assertMockEndpointsSatisfied();
@@ -82,6 +87,7 @@ public class DeadLetterChannelTest exten
public void process(Exchange exchange) {
Integer counter = exchange.getIn().getHeader(DeadLetterChannel.REDELIVERY_COUNTER,
Integer.class);
+ checkBody(exchange.getIn().getBody());
int attempt = (counter == null) ? 1 : counter + 1;
if (attempt < failUntilAttempt) {
throw new RuntimeException("Failed to process due to attempt: " + attempt
@@ -92,6 +98,7 @@ public class DeadLetterChannelTest exten
public boolean process(Exchange exchange, AsyncCallback callback) {
Integer counter = exchange.getIn().getHeader(DeadLetterChannel.REDELIVERY_COUNTER,
Integer.class);
+ checkBody(exchange.getIn().getBody());
int attempt = (counter == null) ? 1 : counter + 1;
if (attempt > 1) {
assertEquals("Now we should use TimerThread to call the process", Thread.currentThread().getName(), "Camel DeadLetterChannel Redeliver Timer");
@@ -105,6 +112,17 @@ public class DeadLetterChannelTest exten
callback.done(false);
return false;
}
+
+ private void checkBody(Object body) {
+ try {
+ assertEquals("Ensure message re-readability in the error handler",
+ DeadLetterChannelTest.this.body,
+ (body instanceof InputStream) ? new String(IOConverter.toBytes((InputStream)body)) : body);
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
+ }
+
};
return new RouteBuilder() {