You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/04/19 16:39:26 UTC
svn commit: r530429 - in /activemq/camel/trunk/camel-core/src:
main/java/org/apache/camel/component/mock/
main/java/org/apache/camel/processor/idempotent/
test/java/org/apache/camel/processor/
Author: jstrachan
Date: Thu Apr 19 07:39:21 2007
New Revision: 530429
URL: http://svn.apache.org/viewvc?view=rev&rev=530429
Log:
added more helper behaviour to the MockEndpoint for easier testing; also migrated the various processor tests to the new MockEndpoint approach to simplify testing
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ChoiceTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/JoinRoutesTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?view=diff&rev=530429&r1=530428&r2=530429
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java Thu Apr 19 07:39:21 2007
@@ -26,6 +26,8 @@
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.ObjectHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.util.ArrayList;
import java.util.HashMap;
@@ -41,13 +43,44 @@
* @version $Revision: 1.1 $
*/
public class MockEndpoint extends DefaultEndpoint<Exchange> {
+ private static final transient Log log = LogFactory.getLog(MockEndpoint.class);
private int receivedCounter;
private int expectedCount = -1;
private Map<Integer, Processor<Exchange>> processors = new HashMap<Integer, Processor<Exchange>>();
private List<Exchange> exchangesReceived = new ArrayList<Exchange>();
private List<Throwable> failures = new ArrayList<Throwable>();
+ private List<Runnable> tests = new ArrayList<Runnable>();
private CountDownLatch latch = new CountDownLatch(1);
+ public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
+ // lets only wait on the first empty endpoint
+ int count = 0;
+ for (MockEndpoint endpoint : endpoints) {
+ if (endpoint.getExpectedCount() != 0) {
+ endpoint.assertIsSatisfied();
+ count++;
+ }
+ }
+
+ for (MockEndpoint endpoint : endpoints) {
+ if (endpoint.getExpectedCount() == 0) {
+ if (count == 0) {
+ endpoint.assertIsSatisfied();
+ count++;
+ }
+ else {
+ endpoint.assertIsSatisfied(0);
+ }
+ }
+ }
+ }
+
+ public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException {
+ for (MockEndpoint endpoint : endpoints) {
+ endpoint.expectsMessageCount(count);
+ }
+ }
+
public MockEndpoint(String endpointUri, Component component) {
super(endpointUri, component);
}
@@ -70,31 +103,41 @@
// Testing API
//-------------------------------------------------------------------------
+
+ /**
+ * Validates that all the available expectations on this endpoint are satisfied; or throw an exception
+ */
public void assertIsSatisfied() throws InterruptedException {
+ assertIsSatisfied(1000);
+ }
+
+ /**
+ * Validates that all the available expectations on this endpoint are satisfied; or throw an exception
+ */
+ public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
if (latch != null) {
// now lets wait for the results
latch.await(10, TimeUnit.SECONDS);
}
else if (expectedCount == 0) {
// lets wait a little bit just in case
- Thread.sleep(1000);
+ Thread.sleep(timeoutForEmptyEndpoints);
}
if (expectedCount >= 0) {
assertEquals("Expected message count", expectedCount, receivedCounter);
}
- }
- protected void assertEquals(String message, Object expectedValue, Object actualValue) {
- if (!ObjectHelper.equals(expectedValue, actualValue)) {
- throw new AssertionError(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
+ for (Runnable test : tests) {
+ test.run();
}
}
- public void reset() {
- receivedCounter = 0;
- }
-
+ /**
+ * Specifies the expected number of message exchanges that should be received by this endpoint
+ *
+ * @param expectedCount the number of message exchanges that should be expected by this endpoint
+ */
public void expectedMessageCount(int expectedCount) {
this.expectedCount = expectedCount;
if (expectedCount <= 0) {
@@ -105,6 +148,47 @@
}
}
+ /**
+ * Adds an expectation that the given body values are received by this endpoint
+ */
+ public void expectedBodiesReceived(final List bodies) {
+ expectedMessageCount(bodies.size());
+
+ expects(new Runnable() {
+ public void run() {
+ int counter = 0;
+ for (Object expectedBody : bodies) {
+ Exchange exchange = getExchangesReceived().get(counter++);
+ assertTrue("No exchange received for counter: " + counter, exchange != null);
+
+ Object actualBody = exchange.getIn().getBody();
+
+ assertEquals("Body of message: " + counter, expectedBody, actualBody);
+
+ log.debug("Received message: " + counter + " with body: " + actualBody);
+ }
+ }
+ });
+ }
+
+ /**
+ * Adds the expection which will be invoked when enough messages are received
+ */
+ public void expects(Runnable runnable) {
+ tests.add(runnable);
+ }
+
+ /**
+ * Adds an expectation that the given body values are received by this endpoint
+ */
+ public void expectedBodiesReceived(Object... bodies) {
+ List bodyList = new ArrayList();
+ for (Object body : bodies) {
+ bodyList.add(body);
+ }
+ expectedBodiesReceived(bodyList);
+ }
+
// Properties
//-------------------------------------------------------------------------
public List<Throwable> getFailures() {
@@ -119,6 +203,10 @@
return exchangesReceived;
}
+ public int getExpectedCount() {
+ return expectedCount;
+ }
+
// Implementation methods
//-------------------------------------------------------------------------
protected synchronized void onExchange(Exchange exchange) {
@@ -136,6 +224,18 @@
}
catch (Exception e) {
failures.add(e);
+ }
+ }
+
+ protected void assertEquals(String message, Object expectedValue, Object actualValue) {
+ if (!ObjectHelper.equals(expectedValue, actualValue)) {
+ throw new AssertionError(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
+ }
+ }
+
+ protected void assertTrue(String message, boolean predicate) {
+ if (!predicate) {
+ throw new AssertionError(message);
}
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?view=diff&rev=530429&r1=530428&r2=530429
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java Thu Apr 19 07:39:21 2007
@@ -20,7 +20,9 @@
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.ExpressionHelper;
+import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,7 +32,7 @@
*
* @version $Revision: 1.1 $
*/
-public class IdempotentConsumer<E extends Exchange> implements Processor<E> {
+public class IdempotentConsumer<E extends Exchange> extends ServiceSupport implements Processor<E> {
private static final transient Log log = LogFactory.getLog(IdempotentConsumer.class);
private Expression<E> messageIdExpression;
private Processor<E> nextProcessor;
@@ -72,6 +74,18 @@
public Processor<E> getNextProcessor() {
return nextProcessor;
+ }
+
+
+ // Implementation methods
+ //-------------------------------------------------------------------------
+
+ protected void doStart() throws Exception {
+ ServiceHelper.startServices(nextProcessor);
+ }
+
+ protected void doStop() throws Exception {
+ ServiceHelper.stopServices(nextProcessor);
}
/**
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ChoiceTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ChoiceTest.java?view=diff&rev=530429&r1=530428&r2=530429
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ChoiceTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ChoiceTest.java Thu Apr 19 07:39:21 2007
@@ -16,92 +16,62 @@
*/
package org.apache.camel.processor;
-import junit.framework.TestCase;
+import static org.apache.camel.component.mock.MockEndpoint.assertIsSatisfied;
+import static org.apache.camel.component.mock.MockEndpoint.expectsMessageCount;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.TestSupport;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.queue.QueueEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.util.ProducerCache;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
/**
* @version $Revision: 1.1 $
*/
-public class ChoiceTest extends TestCase {
- private static final transient Log log = LogFactory.getLog(ChoiceTest.class);
-
- protected CamelContext container = new DefaultCamelContext();
- protected CountDownLatch latch = new CountDownLatch(1);
- protected Endpoint<Exchange> endpoint;
+public class ChoiceTest extends TestSupport {
+ protected CamelContext context = new DefaultCamelContext();
protected ProducerCache<Exchange> client = new ProducerCache<Exchange>();
- protected List<String> receivedBodies = new ArrayList<String>();
+ protected Endpoint<Exchange> startEndpoint;
+ protected MockEndpoint x, y, z;
public void testSendToFirstWhen() throws Exception {
- sendMessage("bar", "one");
- waitForMessageInQueue("b");
- assertQueueContains("b", "one");
- assertQueueEmpty("c");
- assertQueueEmpty("d");
- }
+ x.expectedBodiesReceived("one");
+ expectsMessageCount(0, y, z);
- public void testSendToSecondWhen() throws Exception {
- sendMessage("cheese", "two");
- waitForMessageInQueue("c");
- assertQueueEmpty("b");
- assertQueueContains("c", "two");
- assertQueueEmpty("d");
- }
+ sendMessage("bar", "one");
- public void testSendToOtherwiseClause() throws Exception {
- sendMessage("somethingUndefined", "two");
- waitForMessageInQueue("d");
- assertQueueEmpty("b");
- assertQueueEmpty("c");
- assertQueueContains("d", "two");
+ assertIsSatisfied(x, y, z);
}
- protected void assertQueueContains(String name, String expectedBody) {
- QueueEndpoint endpoint = getQueue(name);
- BlockingQueue queue = endpoint.getQueue();
- assertEquals("Queue size for: " + name + " but was: " + queue, 1, queue.size());
+ public void testSendToSecondWhen() throws Exception {
+ y.expectedBodiesReceived("two");
+ expectsMessageCount(0, x, z);
- Exchange exchange = (Exchange) queue.peek();
- Object firstBody = exchange.getIn().getBody();
- assertEquals("First body", expectedBody, firstBody);
- }
+ sendMessage("cheese", "two");
- protected void assertQueueEmpty(String name) {
- QueueEndpoint queue = getQueue(name);
- assertEquals("Queue size for: " + name, 0, queue.getQueue().size());
+ assertIsSatisfied(x, y, z);
}
+ public void testSendToOtherwiseClause() throws Exception {
+ z.expectedBodiesReceived("three");
+ expectsMessageCount(0, x, y);
- protected void waitForMessageInQueue(String name) throws InterruptedException {
- // TODO we should replace with actual processors on each queue using a latch
- QueueEndpoint endpoint = getQueue(name);
- BlockingQueue queue = endpoint.getQueue();
- for (int i = 0; i < 100 && queue.isEmpty(); i++) {
- Thread.sleep(100);
- }
- }
+ sendMessage("somethingUndefined", "three");
- protected QueueEndpoint getQueue(String name) {
- return (QueueEndpoint) container.resolveEndpoint("queue:" + name);
+ assertIsSatisfied(x, y, z);
}
-
protected void sendMessage(final Object headerValue, final Object body) throws Exception {
- client.send(endpoint, new Processor<Exchange>() {
+ client.send(startEndpoint, new Processor<Exchange>() {
public void process(Exchange exchange) {
// now lets fire in a message
Message in = exchange.getIn();
@@ -113,34 +83,24 @@
@Override
protected void setUp() throws Exception {
- final Processor<Exchange> processor = new Processor<Exchange>() {
- public void process(Exchange e) {
- Message in = e.getIn();
- String body = in.getBody(String.class);
-
- log.debug("Received body: " + body + " on exchange: " + e);
+ context.addRoutes(createRouteBuilder());
- receivedBodies.add(body);
- latch.countDown();
- }
- };
- final String endpointUri = "queue:a";
+ startEndpoint = resolveMandatoryEndpoint(context, "queue:a");
- // lets add some routes
- container.addRoutes(createRouteBuilder(endpointUri, processor));
- endpoint = container.resolveEndpoint(endpointUri);
- assertNotNull("No endpoint found for URI: " + endpointUri, endpoint);
+ x = (MockEndpoint) resolveMandatoryEndpoint(context, "mock:x");
+ y = (MockEndpoint) resolveMandatoryEndpoint(context, "mock:y");
+ z = (MockEndpoint) resolveMandatoryEndpoint(context, "mock:z");
- container.start();
+ context.start();
}
- protected RouteBuilder createRouteBuilder(final String endpointUri, final Processor<Exchange> processor) {
+ protected RouteBuilder createRouteBuilder() {
return new RouteBuilder<Exchange>() {
public void configure() {
from("queue:a").choice()
- .when(header("foo").isEqualTo("bar")).to("queue:b")
- .when(header("foo").isEqualTo("cheese")).to("queue:c")
- .otherwise().to("queue:d");
+ .when(header("foo").isEqualTo("bar")).to("mock:x")
+ .when(header("foo").isEqualTo("cheese")).to("mock:y")
+ .otherwise().to("mock:z");
}
};
}
@@ -148,6 +108,6 @@
@Override
protected void tearDown() throws Exception {
client.stop();
- container.stop();
+ context.stop();
}
}
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java?view=diff&rev=530429&r1=530428&r2=530429
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java Thu Apr 19 07:39:21 2007
@@ -17,25 +17,16 @@
*/
package org.apache.camel.processor;
-import junit.framework.TestCase;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
import org.apache.camel.Message;
-import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
import org.apache.camel.TestSupport;
-import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.util.ProducerCache;
+import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Arrays;
+import org.apache.camel.util.ProducerCache;
/**
* @version $Revision: 1.1 $
@@ -67,7 +58,7 @@
context.addRoutes(createRouteBuilder());
startEndpoint = resolveMandatoryEndpoint(context, "queue:a");
- resultEndpoint = (MockEndpoint)resolveMandatoryEndpoint(context, "mock:result");
+ resultEndpoint = (MockEndpoint) resolveMandatoryEndpoint(context, "mock:result");
context.start();
}
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java?view=diff&rev=530429&r1=530428&r2=530429
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java Thu Apr 19 07:39:21 2007
@@ -17,38 +17,37 @@
*/
package org.apache.camel.processor;
-import static org.apache.camel.processor.idempotent.MemoryMessageIdRepository.memoryMessageIdRepository;
-import junit.framework.TestCase;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
-import org.apache.camel.Processor;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.TestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.util.ProducerCache;
import org.apache.camel.impl.DefaultCamelContext;
+import static org.apache.camel.processor.idempotent.MemoryMessageIdRepository.memoryMessageIdRepository;
+import org.apache.camel.util.ProducerCache;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.List;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
/**
* @version $Revision: 1.1 $
*/
-public class IdempotentConsumerTest extends TestCase {
- private static final transient Log log = LogFactory.getLog(IdempotentConsumerTest.class);
-
+public class IdempotentConsumerTest extends TestSupport {
protected CamelContext context;
- protected CountDownLatch latch = new CountDownLatch(3);
- protected Endpoint<Exchange> endpoint;
protected ProducerCache<Exchange> client = new ProducerCache<Exchange>();
- protected List<String> receivedBodies = new ArrayList<String>();
+
+ protected Endpoint<Exchange> startEndpoint;
+ protected MockEndpoint resultEndpoint;
public void testDuplicateMessagesAreFilteredOut() throws Exception {
+ resultEndpoint.expectedBodiesReceived("one", "two", "three");
+
sendMessage("1", "one");
sendMessage("2", "two");
sendMessage("1", "one");
@@ -56,19 +55,11 @@
sendMessage("1", "one");
sendMessage("3", "three");
- // lets wait on the message being received
- boolean received = latch.await(20, TimeUnit.SECONDS);
- assertTrue("Did not receive the message!", received);
-
- assertEquals("Should have received 3 responses: " + receivedBodies, 3, receivedBodies.size());
-
- assertEquals("received bodies", Arrays.asList(new Object[] { "one", "two", "three"}), receivedBodies);
-
- log.debug("Received bodies: " + receivedBodies);
+ resultEndpoint.assertIsSatisfied();
}
protected void sendMessage(final Object messageId, final Object body) {
- client.send(endpoint, new Processor<Exchange>() {
+ client.send(startEndpoint, new Processor<Exchange>() {
public void process(Exchange exchange) {
// now lets fire in a message
Message in = exchange.getIn();
@@ -82,23 +73,14 @@
protected void setUp() throws Exception {
context = createContext();
- final Processor<Exchange> processor = new Processor<Exchange>() {
- public void process(Exchange e) {
- Message in = e.getIn();
- String body = in.getBody(String.class);
-
- log.debug("Received body: " + body + " on exchange: " + e);
-
- receivedBodies.add(body);
- latch.countDown();
- }
- };
- final String endpointUri = "queue:test.a";
+ String fromUri = "queue:test.a";
+ String toUri = "mock:result";
// lets add some routes
- context.addRoutes(createRouteBuilder(endpointUri, processor));
- endpoint = context.resolveEndpoint(endpointUri);
- assertNotNull("No endpoint found for URI: " + endpointUri, endpoint);
+ context.addRoutes(createRouteBuilder(fromUri, toUri));
+
+ startEndpoint = resolveMandatoryEndpoint(context, fromUri);
+ resultEndpoint = (MockEndpoint) resolveMandatoryEndpoint(context, toUri);
context.start();
}
@@ -107,10 +89,10 @@
return new DefaultCamelContext();
}
- protected RouteBuilder createRouteBuilder(final String endpointUri, final Processor<Exchange> processor) {
+ protected RouteBuilder createRouteBuilder(final String fromUri, final String toUri) {
return new RouteBuilder() {
public void configure() {
- from(endpointUri).idempotentConsumer(header("messageId"), memoryMessageIdRepository()).process(processor);
+ from(fromUri).idempotentConsumer(header("messageId"), memoryMessageIdRepository()).to(toUri);
}
};
}
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/JoinRoutesTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/JoinRoutesTest.java?view=diff&rev=530429&r1=530428&r2=530429
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/JoinRoutesTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/JoinRoutesTest.java Thu Apr 19 07:39:21 2007
@@ -16,52 +16,38 @@
*/
package org.apache.camel.processor;
-import junit.framework.TestCase;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.TestSupport;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.util.ProducerCache;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
/**
* @version $Revision: 1.1 $
*/
-public class JoinRoutesTest extends TestCase {
- private static final transient Log log = LogFactory.getLog(JoinRoutesTest.class);
-
- protected CamelContext container = new DefaultCamelContext();
- protected CountDownLatch latch = new CountDownLatch(3);
- protected Endpoint<Exchange> endpoint;
+public class JoinRoutesTest extends TestSupport {
+ protected CamelContext context = new DefaultCamelContext();
+ protected Endpoint<Exchange> startEndpoint;
+ protected MockEndpoint resultEndpoint;
protected ProducerCache<Exchange> client = new ProducerCache<Exchange>();
- protected List<String> receivedBodies = new ArrayList<String>();
public void testMessagesThroughDifferentRoutes() throws Exception {
+ resultEndpoint.expectedBodiesReceived("one", "two", "three");
+
sendMessage("bar", "one");
sendMessage("cheese", "two");
sendMessage("somethingUndefined", "three");
- // now lets wait for the results
- latch.await(10, TimeUnit.SECONDS);
-
- assertEquals("Number of receives: " + receivedBodies, 3, receivedBodies.size());
- assertEquals("Received bodies", Arrays.asList(new Object[]{"one", "two", "three"}), receivedBodies);
-
- log.debug("Received on queue:e the bodies: " + receivedBodies);
+ resultEndpoint.assertIsSatisfied();
}
protected void sendMessage(final Object headerValue, final Object body) throws Exception {
- client.send(endpoint, new Processor<Exchange>() {
+ client.send(startEndpoint, new Processor<Exchange>() {
public void process(Exchange exchange) {
// now lets fire in a message
Message in = exchange.getIn();
@@ -73,28 +59,15 @@
@Override
protected void setUp() throws Exception {
- final Processor<Exchange> processor = new Processor<Exchange>() {
- public void process(Exchange e) {
- Message in = e.getIn();
- String body = in.getBody(String.class);
+ context.addRoutes(createRouteBuilder());
- log.debug("Received body: " + body + " on exchange: " + e);
+ startEndpoint = resolveMandatoryEndpoint(context, "queue:a");
+ resultEndpoint = (MockEndpoint) resolveMandatoryEndpoint(context, "mock:result");
- receivedBodies.add(body);
- latch.countDown();
- }
- };
- final String endpointUri = "queue:a";
-
- // lets add some routes
- container.addRoutes(createRouteBuilder(endpointUri, processor));
- endpoint = container.resolveEndpoint(endpointUri);
- assertNotNull("No endpoint found for URI: " + endpointUri, endpoint);
-
- container.start();
+ context.start();
}
- protected RouteBuilder createRouteBuilder(final String endpointUri, final Processor<Exchange> processor) {
+ protected RouteBuilder createRouteBuilder() {
return new RouteBuilder<Exchange>() {
public void configure() {
from("queue:a").choice()
@@ -102,11 +75,9 @@
.when(header("foo").isEqualTo("cheese")).to("queue:c")
.otherwise().to("queue:d");
- from("queue:b").to("queue:e");
- from("queue:c").to("queue:e");
- from("queue:d").to("queue:e");
-
- from("queue:e").process(processor);
+ from("queue:b").to("mock:result");
+ from("queue:c").to("mock:result");
+ from("queue:d").to("mock:result");
}
};
}
@@ -114,6 +85,6 @@
@Override
protected void tearDown() throws Exception {
client.stop();
- container.stop();
+ context.stop();
}
}