You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/03/12 06:29:54 UTC

svn commit: r752775 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java test/java/org/apache/camel/processor/IdempotentConsumerTest.java

Author: davsclaus
Date: Thu Mar 12 05:29:53 2009
New Revision: 752775

URL: http://svn.apache.org/viewvc?rev=752775&view=rev
Log:
CAMEL-1451: IdempotentConsumer now adds to the repo after succesful processing.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=752775&r1=752774&r2=752775&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java Thu Mar 12 05:29:53 2009
@@ -34,12 +34,12 @@
  */
 public class IdempotentConsumer extends ServiceSupport implements Processor {
     private static final transient Log LOG = LogFactory.getLog(IdempotentConsumer.class);
-    private Expression messageIdExpression;
-    private Processor nextProcessor;
-    private IdempotentRepository idempotentRepository;
+    private final Expression messageIdExpression;
+    private final Processor nextProcessor;
+    private final IdempotentRepository idempotentRepository;
 
-    public IdempotentConsumer(Expression messageIdExpression, 
-            IdempotentRepository idempotentRepository, Processor nextProcessor) {
+    public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository idempotentRepository,
+                              Processor nextProcessor) {
         this.messageIdExpression = messageIdExpression;
         this.idempotentRepository = idempotentRepository;
         this.nextProcessor = nextProcessor;
@@ -57,10 +57,19 @@
         if (messageId == null) {
             throw new NoMessageIdException(exchange, messageIdExpression);
         }
-        if (idempotentRepository.add(messageId)) {
-            nextProcessor.process(exchange);
-        } else {
+
+        if (idempotentRepository.contains(messageId)) {
             onDuplicateMessage(exchange, messageId);
+        } else {
+            // process it first
+            nextProcessor.process(exchange);
+
+            // then test wheter it was failed or not
+            if (!exchange.isFailed()) {
+                onCompletedMessage(exchange, messageId);
+            } else {
+                onFailedMessage(exchange, messageId);
+            }
         }
     }
 
@@ -101,4 +110,32 @@
             LOG.debug("Ignoring duplicate message with id: " + messageId + " for exchange: " + exchange);
         }
     }
+
+    /**
+     * A strategy method to allow derived classes to overload the behaviour of
+     * processing a completed message
+     *
+     * @param exchange the exchange
+     * @param messageId the message ID of this exchange
+     */
+    @SuppressWarnings("unchecked")
+    protected void onCompletedMessage(Exchange exchange, String messageId) {
+        idempotentRepository.add(messageId);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Added to repository with id: " + messageId + " for exchange: " + exchange);
+        }
+    }
+
+    /**
+     * A strategy method to allow derived classes to overload the behaviour of
+     * processing a failed message
+     *
+     * @param exchange the exchange
+     * @param messageId the message ID of this exchange
+     */
+    protected void onFailedMessage(Exchange exchange, String messageId) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Not added to repository as exchange failed: " + exchange + " with id: " + messageId);
+        }
+    }
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java?rev=752775&r1=752774&r2=752775&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java Thu Mar 12 05:29:53 2009
@@ -23,7 +23,6 @@
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-
 import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
 
 /**
@@ -33,7 +32,22 @@
     protected Endpoint startEndpoint;
     protected MockEndpoint resultEndpoint;
 
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
     public void testDuplicateMessagesAreFilteredOut() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").idempotentConsumer(
+                        header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
+                ).to("mock:result");
+            }
+        });
+        context.start();
+
         resultEndpoint.expectedBodiesReceived("one", "two", "three");
 
         sendMessage("1", "one");
@@ -43,7 +57,41 @@
         sendMessage("1", "one");
         sendMessage("3", "three");
 
-        resultEndpoint.assertIsSatisfied();
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testFailedExchangesNotAdded() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).delay(0).logStackTrace(false));
+
+                from("direct:start").idempotentConsumer(
+                        header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
+                ).process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        String id = exchange.getIn().getHeader("messageId", String.class);
+                        if (id.equals("2")) {
+                            throw new IllegalArgumentException("Damm I cannot handle id 2");
+                        }
+                    }
+                }).to("mock:result");
+            }
+        });
+        context.start();
+
+        // we send in 2 messages with id 2 that fails
+        getMockEndpoint("mock:error").expectedMessageCount(2);
+        resultEndpoint.expectedBodiesReceived("one", "three");
+
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("3", "three");
+
+        assertMockEndpointsSatisfied();
     }
 
     protected void sendMessage(final Object messageId, final Object body) {
@@ -65,13 +113,4 @@
         resultEndpoint = getMockEndpoint("mock:result");
     }
 
-    protected RouteBuilder createRouteBuilder() {
-        return new RouteBuilder() {
-            public void configure() {
-                from("direct:start").idempotentConsumer(
-                        header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
-                ).to("mock:result");
-            }
-        };
-    }
 }