You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/09/15 08:47:11 UTC
svn commit: r997200 -
/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerUsingCustomRepositoryTest.java
Author: davsclaus
Date: Wed Sep 15 06:47:11 2010
New Revision: 997200
URL: http://svn.apache.org/viewvc?rev=997200&view=rev
Log:
Added unit test for using custom idempotent consumer repo.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerUsingCustomRepositoryTest.java
- copied, changed from r996907, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerUsingCustomRepositoryTest.java (from r996907, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerUsingCustomRepositoryTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerUsingCustomRepositoryTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java&r1=996907&r2=997200&rev=997200&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerUsingCustomRepositoryTest.java Wed Sep 15 06:47:11 2010
@@ -16,6 +16,9 @@
*/
package org.apache.camel.processor;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -23,142 +26,48 @@ import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.IdempotentRepository;
/**
* @version $Revision$
*/
-public class IdempotentConsumerTest extends ContextTestSupport {
+public class IdempotentConsumerUsingCustomRepositoryTest extends ContextTestSupport {
protected Endpoint startEndpoint;
protected MockEndpoint resultEndpoint;
+ protected IdempotentRepository<String> customRepo = new MyRepo();
@Override
- public boolean isUseRouteBuilder() {
- return false;
- }
-
- public void testDuplicateMessagesAreFilteredOut() throws Exception {
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("direct:start").idempotentConsumer(
- header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
- ).to("mock:result");
- }
- });
- context.start();
-
- resultEndpoint.expectedBodiesReceived("one", "two", "three");
-
- sendMessage("1", "one");
- sendMessage("2", "two");
- sendMessage("1", "one");
- sendMessage("2", "two");
- sendMessage("1", "one");
- sendMessage("3", "three");
-
- assertMockEndpointsSatisfied();
- }
-
- public void testFailedExchangesNotAddedDeadLetterChannel() throws Exception {
- context.addRoutes(new RouteBuilder() {
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
@Override
public void configure() throws Exception {
- errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).redeliveryDelay(0).logStackTrace(false));
-
- from("direct:start").idempotentConsumer(
- header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
- ).process(new Processor() {
- public void process(Exchange exchange) throws Exception {
- String id = exchange.getIn().getHeader("messageId", String.class);
- if (id.equals("2")) {
- throw new IllegalArgumentException("Damm I cannot handle id 2");
- }
- }
- }).to("mock:result");
+ from("direct:start")
+ .idempotentConsumer(header("messageId"), customRepo)
+ .to("mock:result");
}
- });
- context.start();
-
- // we send in 2 messages with id 2 that fails
- getMockEndpoint("mock:error").expectedMessageCount(2);
- resultEndpoint.expectedBodiesReceived("one", "three");
-
- sendMessage("1", "one");
- sendMessage("2", "two");
- sendMessage("1", "one");
- sendMessage("2", "two");
- sendMessage("1", "one");
- sendMessage("3", "three");
-
- assertMockEndpointsSatisfied();
+ };
}
- public void testFailedExchangesNotAddedDeadLetterChannelNotHandled() throws Exception {
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- errorHandler(deadLetterChannel("mock:error").handled(false).maximumRedeliveries(2).redeliveryDelay(0).logStackTrace(false));
-
- from("direct:start").idempotentConsumer(
- header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
- ).process(new Processor() {
- public void process(Exchange exchange) throws Exception {
- String id = exchange.getIn().getHeader("messageId", String.class);
- if (id.equals("2")) {
- throw new IllegalArgumentException("Damm I cannot handle id 2");
- }
- }
- }).to("mock:result");
- }
- });
- context.start();
-
- // we send in 2 messages with id 2 that fails
- getMockEndpoint("mock:error").expectedMessageCount(2);
- resultEndpoint.expectedBodiesReceived("one", "three");
+ public void testDuplicateMessagesAreFilteredOut() throws Exception {
+ resultEndpoint.expectedBodiesReceived("one", "two", "three");
sendMessage("1", "one");
sendMessage("2", "two");
sendMessage("1", "one");
+ // 4 is already pre added in custom repo so it will be regarded as duplicate
+ sendMessage("4", "four");
sendMessage("2", "two");
sendMessage("1", "one");
sendMessage("3", "three");
assertMockEndpointsSatisfied();
- }
-
- public void testFailedExchangesNotAdded() throws Exception {
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- // use default error handler
- errorHandler(defaultErrorHandler());
-
- from("direct:start").idempotentConsumer(
- header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
- ).process(new Processor() {
- public void process(Exchange exchange) throws Exception {
- String id = exchange.getIn().getHeader("messageId", String.class);
- if (id.equals("2")) {
- throw new IllegalArgumentException("Damm I cannot handle id 2");
- }
- }
- }).to("mock:result");
- }
- });
- context.start();
- resultEndpoint.expectedBodiesReceived("one", "three");
-
- sendMessage("1", "one");
- sendMessage("2", "two");
- sendMessage("1", "one");
- sendMessage("2", "two");
- sendMessage("1", "one");
- sendMessage("3", "three");
-
- assertMockEndpointsSatisfied();
+ // and the custom repo now contains those keys
+ assertTrue(customRepo.contains("1"));
+ assertTrue(customRepo.contains("2"));
+ assertTrue(customRepo.contains("3"));
+ assertTrue(customRepo.contains("4"));
+ assertFalse(customRepo.contains("5"));
}
protected void sendMessage(final Object messageId, final Object body) {
@@ -175,9 +84,40 @@ public class IdempotentConsumerTest exte
@Override
protected void setUp() throws Exception {
super.setUp();
-
startEndpoint = resolveMandatoryEndpoint("direct:start");
resultEndpoint = getMockEndpoint("mock:result");
}
+ private final class MyRepo implements IdempotentRepository<String> {
+
+ private Map<String, String> cache = new HashMap<String, String>();
+
+ private MyRepo() {
+ // pre start with 4 already in there
+ cache.put("4", "4");
+ }
+
+ public boolean add(String key) {
+ if (cache.containsKey(key)) {
+ return false;
+ } else {
+ cache.put(key, key);
+ return true;
+ }
+ }
+
+ public boolean contains(String key) {
+ return cache.containsKey(key);
+ }
+
+ public boolean remove(String key) {
+ return cache.remove(key) != null;
+ }
+
+ public boolean confirm(String key) {
+ // noop
+ return true;
+ }
+ }
+
}