You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/03/12 06:29:54 UTC
svn commit: r752775 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
test/java/org/apache/camel/processor/IdempotentConsumerTest.java
Author: davsclaus
Date: Thu Mar 12 05:29:53 2009
New Revision: 752775
URL: http://svn.apache.org/viewvc?rev=752775&view=rev
Log:
CAMEL-1451: IdempotentConsumer now adds to the repo after succesful processing.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=752775&r1=752774&r2=752775&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java Thu Mar 12 05:29:53 2009
@@ -34,12 +34,12 @@
*/
public class IdempotentConsumer extends ServiceSupport implements Processor {
private static final transient Log LOG = LogFactory.getLog(IdempotentConsumer.class);
- private Expression messageIdExpression;
- private Processor nextProcessor;
- private IdempotentRepository idempotentRepository;
+ private final Expression messageIdExpression;
+ private final Processor nextProcessor;
+ private final IdempotentRepository idempotentRepository;
- public IdempotentConsumer(Expression messageIdExpression,
- IdempotentRepository idempotentRepository, Processor nextProcessor) {
+ public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository idempotentRepository,
+ Processor nextProcessor) {
this.messageIdExpression = messageIdExpression;
this.idempotentRepository = idempotentRepository;
this.nextProcessor = nextProcessor;
@@ -57,10 +57,19 @@
if (messageId == null) {
throw new NoMessageIdException(exchange, messageIdExpression);
}
- if (idempotentRepository.add(messageId)) {
- nextProcessor.process(exchange);
- } else {
+
+ if (idempotentRepository.contains(messageId)) {
onDuplicateMessage(exchange, messageId);
+ } else {
+ // process it first
+ nextProcessor.process(exchange);
+
+ // then test wheter it was failed or not
+ if (!exchange.isFailed()) {
+ onCompletedMessage(exchange, messageId);
+ } else {
+ onFailedMessage(exchange, messageId);
+ }
}
}
@@ -101,4 +110,32 @@
LOG.debug("Ignoring duplicate message with id: " + messageId + " for exchange: " + exchange);
}
}
+
+ /**
+ * A strategy method to allow derived classes to overload the behaviour of
+ * processing a completed message
+ *
+ * @param exchange the exchange
+ * @param messageId the message ID of this exchange
+ */
+ @SuppressWarnings("unchecked")
+ protected void onCompletedMessage(Exchange exchange, String messageId) {
+ idempotentRepository.add(messageId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added to repository with id: " + messageId + " for exchange: " + exchange);
+ }
+ }
+
+ /**
+ * A strategy method to allow derived classes to overload the behaviour of
+ * processing a failed message
+ *
+ * @param exchange the exchange
+ * @param messageId the message ID of this exchange
+ */
+ protected void onFailedMessage(Exchange exchange, String messageId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not added to repository as exchange failed: " + exchange + " with id: " + messageId);
+ }
+ }
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java?rev=752775&r1=752774&r2=752775&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java Thu Mar 12 05:29:53 2009
@@ -23,7 +23,6 @@
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-
import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
/**
@@ -33,7 +32,22 @@
protected Endpoint startEndpoint;
protected MockEndpoint resultEndpoint;
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
public void testDuplicateMessagesAreFilteredOut() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").idempotentConsumer(
+ header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
+ ).to("mock:result");
+ }
+ });
+ context.start();
+
resultEndpoint.expectedBodiesReceived("one", "two", "three");
sendMessage("1", "one");
@@ -43,7 +57,41 @@
sendMessage("1", "one");
sendMessage("3", "three");
- resultEndpoint.assertIsSatisfied();
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testFailedExchangesNotAdded() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).delay(0).logStackTrace(false));
+
+ from("direct:start").idempotentConsumer(
+ header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
+ ).process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ String id = exchange.getIn().getHeader("messageId", String.class);
+ if (id.equals("2")) {
+ throw new IllegalArgumentException("Damm I cannot handle id 2");
+ }
+ }
+ }).to("mock:result");
+ }
+ });
+ context.start();
+
+ // we send in 2 messages with id 2 that fails
+ getMockEndpoint("mock:error").expectedMessageCount(2);
+ resultEndpoint.expectedBodiesReceived("one", "three");
+
+ sendMessage("1", "one");
+ sendMessage("2", "two");
+ sendMessage("1", "one");
+ sendMessage("2", "two");
+ sendMessage("1", "one");
+ sendMessage("3", "three");
+
+ assertMockEndpointsSatisfied();
}
protected void sendMessage(final Object messageId, final Object body) {
@@ -65,13 +113,4 @@
resultEndpoint = getMockEndpoint("mock:result");
}
- protected RouteBuilder createRouteBuilder() {
- return new RouteBuilder() {
- public void configure() {
- from("direct:start").idempotentConsumer(
- header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
- ).to("mock:result");
- }
- };
- }
}